草案 智能体设计模式 - 学习与适应

agentic-designlangchainlanggraphaitypescriptmachine-learning
By sko X opus 4.19/20/202512 min read

学习如何使用 TypeScript、LangGraph 和针对 Vercel 无服务器平台优化的现代内存架构来构建真正通过经验改进的智能体。

心智模型:不断进化的餐厅

将学习智能体想象成一家适应顾客的餐厅。最初,它提供通用菜肴(基线响应)。随着时间推移,它学习常客的偏好(用户特定适应),发现受欢迎的组合(模式识别),根据反馈调整食谱(强化学习),甚至根据过去类似的顾客预测新顾客可能喜欢什么(迁移学习)。就像成功的餐厅从静态菜单演变为动态的、了解顾客的体验,您的智能体将从简单的响应者转变为随着每次交互而改进的智能系统。

基础示例:带记忆的自适应聊天智能体

让我们构建一个简单的智能体,它能记住用户偏好并相应地调整其响应。

// app/api/adaptive-agent/route.ts
import { StateGraph, MemorySaver } from "@langchain/langgraph";
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { HumanMessage, AIMessage } from "@langchain/core/messages";
import { Redis } from "@upstash/redis";
import { groupBy, maxBy, sortBy } from "es-toolkit";
import { z } from "zod";

// 定义自适应状态模式
const AdaptiveStateSchema = z.object({
  messages: z.array(z.any()),
  userPreferences: z.object({
    style: z.enum(['concise', 'detailed', 'technical']).optional(),
    topics: z.array(z.string()).optional(),
  }).default({}),
  interactionCount: z.number().default(0),
  feedbackScores: z.array(z.number()).default([]),
});

type AdaptiveState = z.infer<typeof AdaptiveStateSchema>;

// 初始化 Redis 用于持久化内存
const redis = new Redis({
  url: process.env.UPSTASH_REDIS_URL!,
  token: process.env.UPSTASH_REDIS_TOKEN!,
});

// 创建自适应智能体
async function createAdaptiveAgent() {
  const model = new ChatGoogleGenerativeAI({
    temperature: 0.7,
    modelName: "gemini-pro",
  });

  // 用于会话持久化的内存保存器
  const checkpointer = new MemorySaver();

  // 定义智能体逻辑
  async function agentNode(state: AdaptiveState) {
    const { messages, userPreferences } = state;

    // 根据学习到的偏好调整提示
    const systemPrompt = `你是一个有帮助的助手。
      ${userPreferences.style ? `以${userPreferences.style}的方式回应。` : ''}
      ${userPreferences.topics?.length ? `用户对以下内容感兴趣:${userPreferences.topics.join(', ')}` : ''}
      交互次数:${state.interactionCount}`;

    const response = await model.invoke([
      { role: "system", content: systemPrompt },
      ...messages
    ]);

    return {
      messages: [...messages, response],
      interactionCount: state.interactionCount + 1,
    };
  }

  // 构建图
  const workflow = new StateGraph<AdaptiveState>({
    channels: AdaptiveStateSchema.shape,
  })
    .addNode("agent", agentNode)
    .setEntryPoint("agent")
    .setFinishPoint("agent");

  return workflow.compile({ checkpointer });
}

// API 处理器
export async function POST(req: Request) {
  const { message, sessionId, feedback } = await req.json();

  // 检索或初始化用户状态
  const storedState = await redis.get(`session:${sessionId}`) as AdaptiveState | null;
  const initialState: AdaptiveState = storedState || {
    messages: [],
    userPreferences: {},
    interactionCount: 0,
    feedbackScores: [],
  };

  // 处理提供的反馈
  if (feedback) {
    initialState.feedbackScores.push(feedback);

    // 简单的偏好学习
    if (initialState.feedbackScores.length > 3) {
      const avgScore = initialState.feedbackScores.reduce((a, b) => a + b) / initialState.feedbackScores.length;
      if (avgScore < 3) {
        initialState.userPreferences.style = 'detailed';
      } else if (avgScore > 4) {
        initialState.userPreferences.style = 'concise';
      }
    }
  }

  // 运行智能体
  const agent = await createAdaptiveAgent();
  const result = await agent.invoke(
    {
      ...initialState,
      messages: [...initialState.messages, new HumanMessage(message)],
    },
    {
      configurable: { thread_id: sessionId },
    }
  );

  // 保存更新的状态
  await redis.set(`session:${sessionId}`, result, {
    ex: 86400 * 7, // 7天TTL
  });

  return Response.json({
    response: result.messages[result.messages.length - 1].content,
    preferences: result.userPreferences,
    interactionCount: result.interactionCount,
  });
}

这个基础智能体跟踪用户交互,从反馈分数中学习,并调整其响应风格。它使用 Upstash Redis 作为无服务器兼容的持久存储。

// components/AdaptiveChatInterface.tsx
import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
import { debounce } from 'es-toolkit';

interface ChatResponse {
  response: string;
  preferences: any;
  interactionCount: number;
}

export function AdaptiveChatInterface() {
  const [message, setMessage] = useState('');
  const [sessionId] = useState(() => crypto.randomUUID());
  const [lastResponse, setLastResponse] = useState<string>('');

  const chatMutation = useMutation({
    mutationFn: async (params: { message: string; feedback?: number }) => {
      const response = await fetch('/api/adaptive-agent', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ ...params, sessionId }),
      });
      return response.json() as Promise<ChatResponse>;
    },
    onSuccess: (data) => {
      setLastResponse(data.response);
    },
  });

  // 防抖反馈处理器
  const handleFeedback = debounce((score: number) => {
    chatMutation.mutate({ message: '', feedback: score });
  }, 500);

  return (
    <div className="card bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">自适应聊天智能体</h2>

        {lastResponse && (
          <div className="alert alert-info">
            <span>{lastResponse}</span>
          </div>
        )}

        <div className="form-control">
          <input
            type="text"
            placeholder="问任何问题..."
            className="input input-bordered"
            value={message}
            onChange={(e) => setMessage(e.target.value)}
            onKeyPress={(e) => {
              if (e.key === 'Enter') {
                chatMutation.mutate({ message });
                setMessage('');
              }
            }}
          />
        </div>

        {lastResponse && (
          <div className="rating rating-lg">
            {[1, 2, 3, 4, 5].map((score) => (
              <input
                key={score}
                type="radio"
                name="rating"
                className="mask mask-star-2 bg-orange-400"
                onClick={() => handleFeedback(score)}
              />
            ))}
          </div>
        )}

        <div className="stats shadow">
          <div className="stat">
            <div className="stat-title">交互次数</div>
            <div className="stat-value text-primary">
              {chatMutation.data?.interactionCount || 0}
            </div>
          </div>
        </div>
      </div>
    </div>
  );
}

前端通过星级评分系统收集用户反馈并显示交互统计,创建完整的反馈循环。

高级示例:带经验回放的多智能体学习系统

现在让我们构建一个复杂的系统,拥有多个从共享经验中学习的专门智能体。

// lib/memory/experience-store.ts
import { PineconeStore } from "@langchain/pinecone";
import { GoogleGenerativeAIEmbeddings } from "@langchain/google-genai";
import { Pinecone } from "@pinecone-database/pinecone";
import { chunk, sortBy, take, groupBy, maxBy } from "es-toolkit";
import { z } from "zod";

// 经验模式
const ExperienceSchema = z.object({
  id: z.string(),
  interaction: z.object({
    input: z.string(),
    context: z.record(z.any()),
    agentType: z.string(),
  }),
  outcome: z.object({
    response: z.string(),
    success: z.boolean(),
    metrics: z.object({
      latency: z.number(),
      tokenCount: z.number(),
      userSatisfaction: z.number().optional(),
    }),
  }),
  timestamp: z.string(),
  embedding: z.array(z.number()).optional(),
});

type Experience = z.infer<typeof ExperienceSchema>;

export class ExperienceReplayBuffer {
  private vectorStore: PineconeStore;
  private embeddings: GoogleGenerativeAIEmbeddings;
  private bufferSize = 1000;
  private priorityAlpha = 0.6; // 优先级指数

  constructor() {
    const pinecone = new Pinecone({
      apiKey: process.env.PINECONE_API_KEY!,
    });

    this.embeddings = new GoogleGenerativeAIEmbeddings({
      modelName: "embedding-001",
    });

    this.vectorStore = new PineconeStore(this.embeddings, {
      pineconeIndex: pinecone.index(process.env.PINECONE_INDEX!),
      namespace: "experiences",
    });
  }

  async store(experience: Experience): Promise<void> {
    // 基于TD误差或结果指标计算优先级
    const priority = this.calculatePriority(experience);

    await this.vectorStore.addDocuments([
      {
        pageContent: JSON.stringify({
          interaction: experience.interaction,
          outcome: experience.outcome,
        }),
        metadata: {
          id: experience.id,
          timestamp: experience.timestamp,
          priority,
          agentType: experience.interaction.agentType,
          success: experience.outcome.success,
          userSatisfaction: experience.outcome.metrics.userSatisfaction,
        },
      },
    ]);

    // 维护缓冲区大小
    await this.pruneOldExperiences();
  }

  async sample(
    context: any,
    k: number = 5,
    strategy: 'uniform' | 'prioritized' = 'prioritized'
  ): Promise<Experience[]> {
    const query = JSON.stringify(context);
    const results = await this.vectorStore.similaritySearch(query, k * 2);

    if (strategy === 'prioritized') {
      // 优先级经验回放
      const experiences = results.map(doc => ({
        ...JSON.parse(doc.pageContent),
        priority: doc.metadata.priority || 1,
      }));

      // 基于优先级采样
      const sorted = sortBy(experiences, exp => -exp.priority);
      return take(sorted, k);
    }

    return take(results.map(doc => JSON.parse(doc.pageContent)), k);
  }

  private calculatePriority(experience: Experience): number {
    // 对意外结果赋予更高优先级
    const successWeight = experience.outcome.success ? 0.3 : 0.7;
    const satisfactionWeight = experience.outcome.metrics.userSatisfaction
      ? (5 - experience.outcome.metrics.userSatisfaction) / 5
      : 0.5;

    return Math.pow(successWeight + satisfactionWeight, this.priorityAlpha);
  }

  private async pruneOldExperiences(): Promise<void> {
    // 维护缓冲区大小的实现
    // 当缓冲区超过限制时删除最旧的经验
  }
}

// lib/agents/specialized-agents.ts
import { StateGraph } from "@langchain/langgraph";
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { ExperienceReplayBuffer } from "./memory/experience-store";
import { filter, map, reduce } from "es-toolkit";

interface LearningAgentState {
  messages: any[];
  experiences: any[];
  adaptationMetrics: {
    successRate: number;
    avgLatency: number;
    confidenceScore: number;
  };
}

class SpecializedLearningAgent {
  private model: ChatGoogleGenerativeAI;
  private experienceBuffer: ExperienceReplayBuffer;
  private agentType: string;
  private learningRate = 0.1;

  constructor(agentType: 'researcher' | 'coder' | 'analyst') {
    this.agentType = agentType;
    this.model = new ChatGoogleGenerativeAI({
      temperature: 0.3,
      modelName: "gemini-pro",
    });
    this.experienceBuffer = new ExperienceReplayBuffer();
  }

  async createWorkflow() {
    const workflow = new StateGraph<LearningAgentState>({
      channels: {
        messages: { value: (x: any[], y: any[]) => [...x, ...y] },
        experiences: { value: (x: any[], y: any[]) => [...x, ...y] },
        adaptationMetrics: {
          value: (x: any, y: any) => ({ ...x, ...y })
        },
      },
    });

    // 检索相关经验
    workflow.addNode("retrieve_experiences", async (state) => {
      const context = state.messages[state.messages.length - 1];
      const experiences = await this.experienceBuffer.sample(context, 5);

      return { experiences };
    });

    // 基于经验调整行为
    workflow.addNode("adapt_strategy", async (state) => {
      const successfulExperiences = filter(
        state.experiences,
        exp => exp.outcome.success
      );

      // 计算适应指标
      const successRate = successfulExperiences.length / state.experiences.length;
      const avgLatency = reduce(
        state.experiences,
        (acc, exp) => acc + exp.outcome.metrics.latency,
        0
      ) / state.experiences.length;

      // 确定最优策略
      const strategy = this.determineStrategy(successfulExperiences);

      return {
        adaptationMetrics: {
          successRate,
          avgLatency,
          confidenceScore: this.calculateConfidence(state.experiences),
        },
      };
    });

    // 使用学习到的行为执行
    workflow.addNode("execute", async (state) => {
      const systemPrompt = this.buildAdaptivePrompt(
        state.experiences,
        state.adaptationMetrics
      );

      const startTime = Date.now();
      const response = await this.model.invoke([
        { role: "system", content: systemPrompt },
        ...state.messages,
      ]);
      const latency = Date.now() - startTime;

      // 将此交互存储为新经验
      const experience = {
        id: crypto.randomUUID(),
        interaction: {
          input: state.messages[state.messages.length - 1].content,
          context: state.adaptationMetrics,
          agentType: this.agentType,
        },
        outcome: {
          response: response.content,
          success: true, // 将根据反馈更新
          metrics: {
            latency,
            tokenCount: response.usage?.total_tokens || 0,
          },
        },
        timestamp: new Date().toISOString(),
      };

      await this.experienceBuffer.store(experience);

      return {
        messages: [response],
      };
    });

    // 连接节点
    workflow
      .addEdge("retrieve_experiences", "adapt_strategy")
      .addEdge("adapt_strategy", "execute")
      .setEntryPoint("retrieve_experiences")
      .setFinishPoint("execute");

    return workflow.compile();
  }

  private determineStrategy(experiences: any[]): string {
    // 按模式分组经验
    const patterns = groupBy(experiences, exp =>
      exp.interaction.input.split(' ')[0].toLowerCase()
    );

    // 找到最成功的模式
    const bestPattern = maxBy(
      Object.entries(patterns),
      ([_, exps]) => filter(exps, e => e.outcome.success).length
    );

    return bestPattern ? bestPattern[0] : 'default';
  }

  private calculateConfidence(experiences: any[]): number {
    if (experiences.length === 0) return 0.5;

    const weights = experiences.map((_, idx) =>
      Math.exp(-idx * 0.5) // 新近性的指数衰减
    );

    const weightedSuccess = reduce(
      experiences,
      (acc, exp, idx) => acc + (exp.outcome.success ? weights[idx] : 0),
      0
    );

    return weightedSuccess / reduce(weights, (a, b) => a + b, 0);
  }

  private buildAdaptivePrompt(
    experiences: any[],
    metrics: any
  ): string {
    const successfulPatterns = filter(
      experiences,
      exp => exp.outcome.success
    ).map(exp => exp.interaction.input);

    return `你是专门的${this.agentType}智能体。

      基于过去的交互:
      - 成功率:${(metrics.successRate * 100).toFixed(1)}%
      - 平均响应时间:${metrics.avgLatency}ms
      - 置信水平:${(metrics.confidenceScore * 100).toFixed(1)}%

      观察到的成功模式:
      ${successfulPatterns.slice(0, 3).join('\n')}

      根据这些学习调整你的响应风格和方法。`;
  }
}

// lib/agents/multi-agent-coordinator.ts
export class MultiAgentLearningCoordinator {
  private agents: Map<string, SpecializedLearningAgent>;
  private routingModel: ChatGoogleGenerativeAI;

  constructor() {
    this.agents = new Map([
      ['researcher', new SpecializedLearningAgent('researcher')],
      ['coder', new SpecializedLearningAgent('coder')],
      ['analyst', new SpecializedLearningAgent('analyst')],
    ]);

    this.routingModel = new ChatGoogleGenerativeAI({
      temperature: 0,
      modelName: "gemini-pro",
    });
  }

  async route(input: string): Promise<string> {
    const routingPrompt = `确定哪个专家智能体应该处理这个请求:
      - researcher:用于信息收集、事实核查、研究任务
      - coder:用于编程、调试、代码生成
      - analyst:用于数据分析、洞察、战略规划

      输入:"${input}"

      仅回复智能体名称。`;

    const response = await this.routingModel.invoke(routingPrompt);
    return response.content.trim().toLowerCase();
  }

  async process(input: string, sessionId: string): Promise<any> {
    // 确定要使用的智能体
    const agentType = await this.route(input);
    const agent = this.agents.get(agentType);

    if (!agent) {
      throw new Error(`未知的智能体类型:${agentType}`);
    }

    const workflow = await agent.createWorkflow();
    const result = await workflow.invoke({
      messages: [{ role: "user", content: input }],
      experiences: [],
      adaptationMetrics: {
        successRate: 0.5,
        avgLatency: 0,
        confidenceScore: 0.5,
      },
    });

    return {
      response: result.messages[result.messages.length - 1].content,
      agentType,
      metrics: result.adaptationMetrics,
    };
  }
}

// app/api/learning-system/route.ts
import { MultiAgentLearningCoordinator } from "@/lib/agents/multi-agent-coordinator";
import { NextRequest } from "next/server";

const coordinator = new MultiAgentLearningCoordinator();

export async function POST(req: NextRequest) {
  // 为无服务器禁用后台回调
  process.env.LANGCHAIN_CALLBACKS_BACKGROUND = "false";

  const { message, sessionId, feedback } = await req.json();

  try {
    const result = await coordinator.process(message, sessionId);

    return Response.json({
      success: true,
      ...result,
    });
  } catch (error) {
    console.error("学习系统错误:", error);
    return Response.json(
      { success: false, error: "处理失败" },
      { status: 500 }
    );
  }
}

这个高级系统实现了优先级经验回放、多智能体协调以及基于历史性能的自适应策略选择。

// components/LearningSystemDashboard.tsx
import { useState } from 'react';
import { useQuery, useMutation } from '@tanstack/react-query';
import { groupBy, sortBy } from 'es-toolkit';
import { LineChart, Line, XAxis, YAxis, CartesianGrid, Tooltip, Legend } from 'recharts';

interface SystemMetrics {
  agentType: string;
  metrics: {
    successRate: number;
    avgLatency: number;
    confidenceScore: number;
  };
}

export function LearningSystemDashboard() {
  const [sessionId] = useState(() => crypto.randomUUID());
  const [history, setHistory] = useState<SystemMetrics[]>([]);

  const sendMessage = useMutation({
    mutationFn: async (message: string) => {
      const response = await fetch('/api/learning-system', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ message, sessionId }),
      });
      return response.json();
    },
    onSuccess: (data) => {
      setHistory(prev => [...prev, {
        agentType: data.agentType,
        metrics: data.metrics,
      }]);
    },
  });

  // 计算性能趋势
  const performanceData = history.map((item, index) => ({
    interaction: index + 1,
    confidence: item.metrics.confidenceScore * 100,
    success: item.metrics.successRate * 100,
  }));

  // 智能体使用分布
  const agentUsage = Object.entries(
    groupBy(history, item => item.agentType)
  ).map(([agent, items]) => ({
    agent,
    count: items.length,
    avgConfidence: items.reduce((acc, item) =>
      acc + item.metrics.confidenceScore, 0
    ) / items.length * 100,
  }));

  return (
    <div className="container mx-auto p-4">
      <div className="grid grid-cols-1 lg:grid-cols-2 gap-6">
        {/* 聊天界面 */}
        <div className="card bg-base-100 shadow-xl">
          <div className="card-body">
            <h2 className="card-title">多智能体学习系统</h2>

            <div className="form-control">
              <div className="input-group">
                <input
                  type="text"
                  placeholder="问任何问题..."
                  className="input input-bordered flex-1"
                  onKeyPress={(e) => {
                    if (e.key === 'Enter') {
                      sendMessage.mutate((e.target as HTMLInputElement).value);
                      (e.target as HTMLInputElement).value = '';
                    }
                  }}
                />
                <button
                  className="btn btn-primary"
                  onClick={() => {
                    const input = document.querySelector('input');
                    if (input?.value) {
                      sendMessage.mutate(input.value);
                      input.value = '';
                    }
                  }}
                >
                  发送
                </button>
              </div>
            </div>

            {sendMessage.data && (
              <div className="alert alert-info mt-4">
                <div>
                  <span className="badge badge-secondary mr-2">
                    {sendMessage.data.agentType}
                  </span>
                  {sendMessage.data.response}
                </div>
              </div>
            )}
          </div>
        </div>

        {/* 性能指标 */}
        <div className="card bg-base-100 shadow-xl">
          <div className="card-body">
            <h2 className="card-title">学习进展</h2>

            {performanceData.length > 0 && (
              <LineChart width={400} height={200} data={performanceData}>
                <CartesianGrid strokeDasharray="3 3" />
                <XAxis dataKey="interaction" />
                <YAxis />
                <Tooltip />
                <Legend />
                <Line
                  type="monotone"
                  dataKey="confidence"
                  stroke="#8884d8"
                  name="置信度 %"
                />
                <Line
                  type="monotone"
                  dataKey="success"
                  stroke="#82ca9d"
                  name="成功率 %"
                />
              </LineChart>
            )}
          </div>
        </div>

        {/* 智能体统计 */}
        <div className="card bg-base-100 shadow-xl lg:col-span-2">
          <div className="card-body">
            <h2 className="card-title">智能体性能</h2>

            <div className="overflow-x-auto">
              <table className="table table-zebra">
                <thead>
                  <tr>
                    <th>智能体类型</th>
                    <th>使用次数</th>
                    <th>平均置信度</th>
                  </tr>
                </thead>
                <tbody>
                  {agentUsage.map(agent => (
                    <tr key={agent.agent}>
                      <td className="font-bold">{agent.agent}</td>
                      <td>{agent.count}</td>
                      <td>
                        <progress
                          className="progress progress-primary w-32"
                          value={agent.avgConfidence}
                          max="100"
                        />
                        <span className="ml-2">{agent.avgConfidence.toFixed(1)}%</span>
                      </td>
                    </tr>
                  ))}
                </tbody>
              </table>
            </div>
          </div>
        </div>
      </div>
    </div>
  );
}

仪表板提供学习进展、智能体性能指标和系统置信度趋势的实时可视化。

结论

学习和适应将静态的LLM应用程序转变为随着每次交互而改进的动态系统。通过结合LangGraph的状态编排、基于向量的经验回放和轻量级强化学习技术,您可以构建真正从经验中学习的智能体,同时在Vercel等无服务器平台上保持出色的性能。从简单的偏好学习开始,然后随着系统的成熟逐步添加复杂的功能,如优先级经验回放和多智能体协调。