草案 智能体设计模式 - 反思
了解如何使用 LangChain、LangGraph 和 TypeScript 在 Vercel 无服务器平台上实现自我改进的 AI 智能体。
心智模型:代码审查类比
反思模式可以像拉取请求审查流程一样思考。当你提交代码时,审查者(批评者智能体)会检查它,提供反馈,而你(生产者智能体)会根据该反馈进行修改。这个循环会持续到代码满足质量标准或达到合并截止日期。在 AI 智能体中,同样的原理通过结构化的反馈循环实现迭代式自我改进。就像代码审查能够发现错误并提高质量一样,反思模式帮助智能体识别和纠正自己的错误,从而产生更准确可靠的输出。
基本示例:自我反思智能体
1. 创建反思状态图
// lib/agents/reflection-basic.ts
import { StateGraph, END, START, Annotation } from "@langchain/langgraph";
import { BaseMessage, HumanMessage, AIMessage } from "@langchain/core/messages";
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { ChatPromptTemplate, MessagesPlaceholder } from "@langchain/core/prompts";
import { take } from "es-toolkit";
const ReflectionState = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (x, y) => x.concat(y),
}),
reflectionCount: Annotation<number>({
reducer: (x, y) => y ?? x,
default: () => 0,
}),
});
const model = new ChatGoogleGenerativeAI({
modelName: "gemini-2.5-flash",
temperature: 0.7,
});
const generatePrompt = ChatPromptTemplate.fromMessages([
["system", "You are an expert essay writer. Generate a response to the user's request."],
new MessagesPlaceholder("messages"),
]);
const reflectPrompt = ChatPromptTemplate.fromMessages([
["system", `You are a writing critic. Review the essay and provide specific, actionable feedback.
If the essay is excellent, respond with only "APPROVED".
Otherwise, list 2-3 specific improvements needed.`],
new MessagesPlaceholder("messages"),
]);
创建带有消息历史和反思计数器的基本状态结构。状态跟踪对话和已发生的反思循环次数。
2. 实现生成和反思节点
// lib/agents/reflection-basic.ts (续)
async function generateNode(state: typeof ReflectionState.State) {
const chain = generatePrompt.pipe(model);
const response = await chain.invoke({
messages: state.messages
});
return {
messages: [response],
};
}
async function reflectNode(state: typeof ReflectionState.State) {
const chain = reflectPrompt.pipe(model);
const lastMessages = take(state.messages, -2); // 获取最后的用户消息和 AI 响应
const critique = await chain.invoke({
messages: lastMessages
});
return {
messages: [new HumanMessage(`Feedback: ${critique.content}`)],
reflectionCount: state.reflectionCount + 1,
};
}
function shouldContinue(state: typeof ReflectionState.State) {
const lastMessage = state.messages[state.messages.length - 1];
// 如果已批准或达到最大反思次数则停止
if (lastMessage.content?.toString().includes("APPROVED") ||
state.reflectionCount >= 3) {
return END;
}
return "reflect";
}
生成节点创建初始内容,反思节点对其进行批评。shouldContinue 函数根据质量批准或迭代限制实现停止逻辑。
3. 构建工作流图
// lib/agents/reflection-basic.ts (续)
export function createReflectionAgent() {
const workflow = new StateGraph(ReflectionState)
.addNode("generate", generateNode)
.addNode("reflect", reflectNode)
.addEdge(START, "generate")
.addConditionalEdges("generate", shouldContinue, {
reflect: "reflect",
[END]: END,
})
.addEdge("reflect", "generate");
return workflow.compile();
}
使用控制反思循环流程的条件边组装工作流。
4. 创建 API 路由
// app/api/reflection/route.ts
import { createReflectionAgent } from "@/lib/agents/reflection-basic";
import { HumanMessage } from "@langchain/core/messages";
import { NextResponse } from "next/server";
export const runtime = "nodejs";
export const maxDuration = 60;
export async function POST(req: Request) {
try {
const { prompt } = await req.json();
const agent = createReflectionAgent();
const result = await agent.invoke({
messages: [new HumanMessage(prompt)],
reflectionCount: 0,
});
// 提取最终精炼的输出
const finalOutput = result.messages
.filter((m: any) => m._getType() === "ai")
.pop()?.content;
return NextResponse.json({
output: finalOutput,
iterations: result.reflectionCount,
messages: result.messages.map((m: any) => ({
type: m._getType(),
content: m.content,
})),
});
} catch (error) {
console.error("Reflection error:", error);
return NextResponse.json(
{ error: "Reflection process failed" },
{ status: 500 }
);
}
}
处理 HTTP 请求并使用适当的错误处理管理反思智能体执行。
5. 构建前端组件
// components/ReflectionDemo.tsx
"use client";
import { useState } from "react";
import { useMutation } from "@tanstack/react-query";
import { groupBy } from "es-toolkit";
interface ReflectionResult {
output: string;
iterations: number;
messages: Array<{ type: string; content: string }>;
}
export default function ReflectionDemo() {
const [prompt, setPrompt] = useState("");
const [showProcess, setShowProcess] = useState(false);
const reflection = useMutation({
mutationFn: async (userPrompt: string) => {
const res = await fetch("/api/reflection", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt: userPrompt }),
});
if (!res.ok) throw new Error("Reflection failed");
return res.json() as Promise<ReflectionResult>;
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (prompt.trim()) {
reflection.mutate(prompt);
}
};
// 按迭代分组消息以供显示
const messagesByIteration = reflection.data?.messages
? groupBy(reflection.data.messages, (_, index) =>
Math.floor(index / 2).toString()
)
: {};
return (
<div className="card w-full bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">反思智能体演示</h2>
<form onSubmit={handleSubmit}>
<textarea
className="textarea textarea-bordered w-full"
placeholder="输入您的写作提示..."
value={prompt}
onChange={(e) => setPrompt(e.target.value)}
rows={3}
disabled={reflection.isPending}
/>
<div className="card-actions justify-between mt-4">
<label className="label cursor-pointer">
<span className="label-text mr-2">显示过程</span>
<input
type="checkbox"
className="checkbox"
checked={showProcess}
onChange={(e) => setShowProcess(e.target.checked)}
/>
</label>
<button
type="submit"
className="btn btn-primary"
disabled={reflection.isPending || !prompt.trim()}
>
{reflection.isPending ? (
<>
<span className="loading loading-spinner"></span>
反思中...
</>
) : "生成"}
</button>
</div>
</form>
{reflection.data && (
<div className="mt-6 space-y-4">
<div className="stats shadow">
<div className="stat">
<div className="stat-title">反思迭代次数</div>
<div className="stat-value">{reflection.data.iterations}</div>
</div>
</div>
{showProcess && (
<div className="space-y-4">
{Object.entries(messagesByIteration).map(([iter, msgs]) => (
<div key={iter} className="collapse collapse-arrow bg-base-200">
<input type="checkbox" />
<div className="collapse-title font-medium">
迭代 {parseInt(iter) + 1}
</div>
<div className="collapse-content">
{msgs.map((msg, idx) => (
<div key={idx} className={`chat chat-${msg.type === "ai" ? "end" : "start"}`}>
<div className={`chat-bubble ${msg.type === "human" ? "chat-bubble-primary" : ""}`}>
{msg.content}
</div>
</div>
))}
</div>
</div>
))}
</div>
)}
<div className="divider">最终输出</div>
<div className="prose max-w-none">
{reflection.data.output}
</div>
</div>
)}
</div>
</div>
);
}
提供具有可折叠迭代视图的交互式 UI 来演示反思过程。
高级示例:带流式传输的生产者-批评者架构
1. 定义生产者和批评者智能体
// lib/agents/producer-critic.ts
import { StateGraph, END, START, Annotation } from "@langchain/langgraph";
import { BaseMessage, HumanMessage, AIMessage } from "@langchain/core/messages";
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { z } from "zod";
import { minBy, maxBy } from "es-toolkit";
import { StructuredOutputParser } from "@langchain/core/output_parsers";
const CritiqueSchema = z.object({
score: z.number().min(0).max(100),
approved: z.boolean(),
issues: z.array(z.object({
category: z.enum(["accuracy", "clarity", "completeness", "style"]),
description: z.string(),
severity: z.enum(["minor", "major", "critical"]),
})),
suggestions: z.array(z.string()),
});
const ProducerCriticState = Annotation.Root({
task: Annotation<string>(),
drafts: Annotation<string[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
critiques: Annotation<typeof CritiqueSchema._type[]>({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
iteration: Annotation<number>({
reducer: (_, y) => y,
default: () => 0,
}),
});
const producer = new ChatGoogleGenerativeAI({
modelName: "gemini-2.5-pro",
temperature: 0.7,
maxOutputTokens: 2048,
});
const critic = new ChatGoogleGenerativeAI({
modelName: "gemini-2.5-flash",
temperature: 0.3,
});
为生产者和批评者角色定义单独的模型,使用结构化的批评输出模式。
2. 实现带上下文的生产者节点
// lib/agents/producer-critic.ts (续)
async function producerNode(state: typeof ProducerCriticState.State) {
const lastCritique = state.critiques[state.critiques.length - 1];
let prompt = `任务:${state.task}`;
if (lastCritique) {
const criticalIssues = lastCritique.issues
.filter(i => i.severity === "critical")
.map(i => `- ${i.description}`)
.join("\n");
prompt += `\n\n之前的草稿收到了反馈。需要解决的关键问题:\n${criticalIssues}`;
prompt += `\n\n改进建议:\n${lastCritique.suggestions.join("\n")}`;
prompt += `\n\n生成一个解决所有反馈的改进版本。`;
} else {
prompt += "\n\n生成高质量的响应。";
}
const response = await producer.invoke(prompt);
return {
drafts: [response.content as string],
iteration: state.iteration + 1,
};
}
生产者节点结合之前的批评反馈来生成改进的草稿。
3. 实现带结构化输出的批评者节点
// lib/agents/producer-critic.ts (续)
async function criticNode(state: typeof ProducerCriticState.State) {
const latestDraft = state.drafts[state.drafts.length - 1];
const parser = StructuredOutputParser.fromZodSchema(CritiqueSchema);
const prompt = `你是一位专业批评者。评估这个针对任务"${state.task}"的响应
要评估的响应:
${latestDraft}
按照这个 JSON 模式提供详细的批评:
${parser.getFormatInstructions()}
90 分以上表示响应优秀且被批准。
反馈应该彻底但具有建设性。`;
const response = await critic.invoke(prompt);
const critique = await parser.parse(response.content as string);
return {
critiques: [critique],
};
}
批评者提供包含分数、问题分类和改进建议的结构化反馈。
4. 高级路由逻辑
// lib/agents/producer-critic.ts (续)
function routingLogic(state: typeof ProducerCriticState.State) {
const lastCritique = state.critiques[state.critiques.length - 1];
// 早期终止条件
if (!lastCritique) return "critic";
if (lastCritique.approved || state.iteration >= 5) {
return END;
}
// 基于批评严重性的自适应路由
const criticalCount = lastCritique.issues.filter(i => i.severity === "critical").length;
if (criticalCount > 2 && state.iteration < 3) {
// 需要重大重写
return "producer";
} else if (lastCritique.score > 75) {
// 仅需要小的改进
return "producer";
} else {
// 标准迭代
return "producer";
}
}
export function createProducerCriticAgent() {
const workflow = new StateGraph(ProducerCriticState)
.addNode("producer", producerNode)
.addNode("critic", criticNode)
.addEdge(START, "producer")
.addEdge("producer", "critic")
.addConditionalEdges("critic", routingLogic, {
producer: "producer",
[END]: END,
});
return workflow.compile();
}
基于批评严重性和迭代计数实现复杂的路由。
5. 使用服务器发送事件的流式 API
// app/api/producer-critic/route.ts
import { createProducerCriticAgent } from "@/lib/agents/producer-critic";
import { debounce } from "es-toolkit";
export const runtime = "nodejs";
export const maxDuration = 300;
export async function POST(req: Request) {
const { task } = await req.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
// 防抖写入以避免压垮客户端
const debouncedWrite = debounce(async (data: any) => {
await writer.write(
encoder.encode(`data: ${JSON.stringify(data)}\n\n`)
);
}, 100);
const agent = createProducerCriticAgent();
(async () => {
try {
const eventStream = await agent.streamEvents(
{ task, drafts: [], critiques: [], iteration: 0 },
{ version: "v2" }
);
for await (const event of eventStream) {
if (event.event === "on_chain_end" && event.name === "producer") {
await debouncedWrite({
type: "draft",
iteration: event.data.output.iteration,
content: event.data.output.drafts[event.data.output.drafts.length - 1],
});
}
if (event.event === "on_chain_end" && event.name === "critic") {
const critique = event.data.output.critiques[event.data.output.critiques.length - 1];
await debouncedWrite({
type: "critique",
iteration: event.data.output.iteration,
score: critique.score,
approved: critique.approved,
issues: critique.issues,
});
}
}
await writer.write(encoder.encode(`data: ${JSON.stringify({ type: "complete" })}\n\n`));
} catch (error) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({ type: "error", error: String(error) })}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
}
使用服务器发送事件实时流式传输反思事件,用于渐进式 UI 更新。
6. 带实时可视化的高级前端
// components/ProducerCriticDemo.tsx
"use client";
import { useState, useCallback } from "react";
import { useMutation } from "@tanstack/react-query";
import { partition, groupBy } from "es-toolkit";
interface StreamEvent {
type: "draft" | "critique" | "complete" | "error";
iteration?: number;
content?: string;
score?: number;
approved?: boolean;
issues?: Array<{
category: string;
description: string;
severity: string;
}>;
error?: string;
}
export default function ProducerCriticDemo() {
const [task, setTask] = useState("");
const [events, setEvents] = useState<StreamEvent[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const startReflection = useCallback(async () => {
setEvents([]);
setIsStreaming(true);
try {
const response = await fetch("/api/producer-critic", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ task }),
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
try {
const event = JSON.parse(line.slice(6));
setEvents(prev => [...prev, event]);
if (event.type === "complete" || event.type === "error") {
setIsStreaming(false);
}
} catch {}
}
}
}
} catch (error) {
console.error("Stream error:", error);
setIsStreaming(false);
}
}, [task]);
// 按迭代分组事件
const [drafts, critiques] = partition(
events.filter(e => e.type === "draft" || e.type === "critique"),
e => e.type === "draft"
);
const iterations = groupBy(
[...drafts, ...critiques],
e => e.iteration?.toString() || "0"
);
const finalDraft = drafts[drafts.length - 1];
const finalCritique = critiques[critiques.length - 1];
return (
<div className="container mx-auto p-4">
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">生产者-批评者反思系统</h2>
<div className="form-control">
<textarea
className="textarea textarea-bordered"
placeholder="描述您的任务..."
value={task}
onChange={(e) => setTask(e.target.value)}
rows={3}
disabled={isStreaming}
/>
</div>
<div className="card-actions justify-end mt-4">
<button
className="btn btn-primary"
onClick={startReflection}
disabled={isStreaming || !task.trim()}
>
{isStreaming ? (
<>
<span className="loading loading-spinner"></span>
处理中...
</>
) : "开始反思"}
</button>
</div>
</div>
</div>
{events.length > 0 && (
<div className="grid grid-cols-1 lg:grid-cols-2 gap-4 mt-6">
{/* 迭代时间线 */}
<div className="card bg-base-100 shadow">
<div className="card-body">
<h3 className="card-title text-lg">反思过程</h3>
<ul className="timeline timeline-vertical">
{Object.entries(iterations).map(([iter, iterEvents]) => {
const draft = iterEvents.find(e => e.type === "draft");
const critique = iterEvents.find(e => e.type === "critique");
return (
<li key={iter}>
<div className="timeline-middle">
<svg
xmlns="http://www.w3.org/2000/svg"
viewBox="0 0 20 20"
fill="currentColor"
className="h-5 w-5"
>
<path
fillRule="evenodd"
d="M10 18a8 8 0 100-16 8 8 0 000 16zm3.857-9.809a.75.75 0 00-1.214-.882l-3.483 4.79-1.88-1.88a.75.75 0 10-1.06 1.061l2.5 2.5a.75.75 0 001.137-.089l4-5.5z"
clipRule="evenodd"
/>
</svg>
</div>
<div className="timeline-end timeline-box">
<div className="text-lg font-black">迭代 {iter}</div>
{critique && (
<div className="stats stats-horizontal shadow mt-2">
<div className="stat">
<div className="stat-title">分数</div>
<div className="stat-value text-2xl">{critique.score}</div>
</div>
<div className="stat">
<div className="stat-title">状态</div>
<div className={`stat-value text-2xl ${critique.approved ? "text-success" : "text-warning"}`}>
{critique.approved ? "✓" : "↻"}
</div>
</div>
</div>
)}
{critique?.issues && (
<div className="mt-2">
<p className="font-semibold">发现的问题:</p>
{critique.issues.map((issue, idx) => (
<div key={idx} className={`badge badge-${issue.severity === "critical" ? "error" : "warning"} gap-2 mr-1`}>
{issue.category}
</div>
))}
</div>
)}
</div>
<hr />
</li>
);
})}
</ul>
</div>
</div>
{/* 最终输出 */}
<div className="card bg-base-100 shadow">
<div className="card-body">
<h3 className="card-title text-lg">最终输出</h3>
{finalCritique?.approved && (
<div className="alert alert-success">
<span>输出已批准,分数:{finalCritique.score}/100</span>
</div>
)}
{finalDraft && (
<div className="prose max-w-none">
<div className="mockup-code">
<pre><code>{finalDraft.content}</code></pre>
</div>
</div>
)}
{isStreaming && (
<div className="flex justify-center">
<span className="loading loading-dots loading-lg"></span>
</div>
)}
</div>
</div>
</div>
)}
</div>
);
}
创建具有时间线可视化和实时流式更新的复杂 UI。
7. 通过缓存进行性能优化
// lib/cache/reflection-cache.ts
import { kv } from "@vercel/kv";
import { hash } from "es-toolkit/compat";
interface CacheEntry {
task: string;
output: string;
score: number;
timestamp: number;
}
export class ReflectionCache {
private readonly ttl = 3600; // 1 小时
async get(task: string): Promise<CacheEntry | null> {
const key = `reflection:${hash(task)}`;
const cached = await kv.get<CacheEntry>(key);
if (cached && Date.now() - cached.timestamp < this.ttl * 1000) {
return cached;
}
return null;
}
async set(task: string, output: string, score: number): Promise<void> {
const key = `reflection:${hash(task)}`;
const entry: CacheEntry = {
task,
output,
score,
timestamp: Date.now(),
};
await kv.set(key, entry, { ex: this.ttl });
}
async getSimilar(task: string, threshold = 0.8): Promise<CacheEntry[]> {
// 实现语义相似性搜索
const allKeys = await kv.keys("reflection:*");
const similar: CacheEntry[] = [];
for (const key of allKeys) {
const entry = await kv.get<CacheEntry>(key);
if (entry) {
// 简单的相似性检查(实现适当的语义相似性)
const similarity = this.calculateSimilarity(task, entry.task);
if (similarity > threshold) {
similar.push(entry);
}
}
}
return similar;
}
private calculateSimilarity(a: string, b: string): number {
// 简化的相似性计算
const wordsA = new Set(a.toLowerCase().split(" "));
const wordsB = new Set(b.toLowerCase().split(" "));
const intersection = new Set([...wordsA].filter(x => wordsB.has(x)));
const union = new Set([...wordsA, ...wordsB]);
return intersection.size / union.size;
}
}
实现缓存以减少类似任务的冗余反思循环。
8. 成本跟踪和优化
// lib/monitoring/cost-tracker.ts
interface ReflectionMetrics {
totalTokens: number;
inputTokens: number;
outputTokens: number;
iterations: number;
duration: number;
estimatedCost: number;
}
export class CostTracker {
private metrics: ReflectionMetrics = {
totalTokens: 0,
inputTokens: 0,
outputTokens: 0,
iterations: 0,
duration: 0,
estimatedCost: 0,
};
private readonly costPerToken = {
"gemini-2.5-pro": { input: 0.00125, output: 0.005 },
"gemini-2.5-flash": { input: 0.00015, output: 0.0006 },
};
trackIteration(model: string, inputTokens: number, outputTokens: number): void {
this.metrics.inputTokens += inputTokens;
this.metrics.outputTokens += outputTokens;
this.metrics.totalTokens += inputTokens + outputTokens;
this.metrics.iterations += 1;
const modelCost = this.costPerToken[model as keyof typeof this.costPerToken];
if (modelCost) {
this.metrics.estimatedCost +=
(inputTokens * modelCost.input + outputTokens * modelCost.output) / 1000;
}
}
shouldContinue(maxCost: number = 0.10): boolean {
return this.metrics.estimatedCost < maxCost;
}
getMetrics(): ReflectionMetrics {
return { ...this.metrics };
}
}
跟踪令牌使用和成本以实现预算感知的反思循环。
结论
反思模式将 AI 智能体从单次响应系统转变为能够自我改进的迭代学习者。通过实现具有适当状态管理、流式传输功能和成本优化的生产者-批评者架构,您可以在 Vercel 的无服务器平台上部署复杂的反思系统。关键是通过智能缓存、提前停止和自适应路由策略来平衡质量改进与计算成本。从简单任务的基本自我反思开始,然后扩展到需要更高质量输出的复杂场景的多智能体生产者-批评者系统。