草案 智能体设计模式 - 多智能体协作

ailangchainlanggraphmulti-agentgeminitypescript
By sko X opus 4.19/20/202515 min read

多智能体协作通过编排多个专门的AI智能体共同朝着共同目标努力,从而实现复杂问题的解决。与依赖单个整体智能体不同,这种模式将任务分配给具有特定功能和工具的专门智能体,从而产生更强大和可扩展的解决方案。

心智模型:管弦乐队指挥架构

将多智能体协作想象成在Next.js API路由中指挥管弦乐队。每个智能体都是一位专业的演奏家(小提琴=研究员,鼓=数据分析师,钢琴=作家),拥有自己的乐器(工具/功能)。指挥(监督智能体或编排器)协调这些专家创造交响曲(完整的解决方案)。在Vercel的无服务器环境中,这意味着启动轻量级的专门函数,通过Redis或Upstash中的共享状态进行协作,类似于微服务通过消息队列进行通信,但在每个节点都有AI驱动的决策。

基础示例:监督者-工作者模式

让我们构建一个简单的多智能体系统,其中监督者协调专门的工作者来处理研究任务。

1. 定义智能体状态和类型

// app/lib/agents/types.ts
import { Annotation } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
import { z } from "zod";

// 定义智能体之间流动的状态
export const AgentState = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: (x, y) => x.concat(y),
    default: () => []
  }),
  task: Annotation<string>(),
  researchData: Annotation<string>(),
  analysis: Annotation<string>(),
  finalOutput: Annotation<string>(),
  nextAgent: Annotation<string>()
});

export type AgentStateType = typeof AgentState.State;

// 智能体配置模式
export const AgentConfigSchema = z.object({
  name: z.string(),
  role: z.string(),
  temperature: z.number().default(0.7),
  maxTokens: z.number().default(1000)
});

使用LangGraph的Annotation系统和适当的TypeScript类型定义共享状态结构。

2. 创建专门的工作者智能体

// app/lib/agents/workers.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { DynamicStructuredTool } from "@langchain/core/tools";
import { z } from "zod";
import { isEmpty, pick } from "es-toolkit";
import { AgentStateType } from "./types";

// 具有网络搜索功能的研究智能体
export const researchAgent = async (state: AgentStateType) => {
  const model = new ChatGoogleGenerativeAI({
    model: "gemini-2.5-flash",
    temperature: 0.5,
    maxRetries: 2
  });

  const searchTool = new DynamicStructuredTool({
    name: "web_search",
    description: "在网络上搜索信息",
    schema: z.object({
      query: z.string().describe("搜索查询")
    }),
    func: async ({ query }) => {
      // 模拟搜索 - 替换为实际的API调用
      await new Promise(resolve => setTimeout(resolve, 100));
      return `找到关于以下内容的信息: ${query}`;
    }
  });

  const prompt = `你是一名研究专家。
    任务: ${state.task}
    使用搜索工具收集相关信息。`;

  const response = await model.invoke(prompt);

  return {
    messages: [response],
    researchData: response.content as string
  };
};

// 数据处理分析智能体
export const analysisAgent = async (state: AgentStateType) => {
  const model = new ChatGoogleGenerativeAI({
    model: "gemini-2.5-flash",
    temperature: 0.3
  });

  const prompt = `你是一名数据分析师。
    任务: ${state.task}
    研究数据: ${state.researchData}

    分析这些数据并提供见解。`;

  const response = await model.invoke(prompt);

  return {
    messages: [response],
    analysis: response.content as string
  };
};

// 最终输出的写作智能体
export const writerAgent = async (state: AgentStateType) => {
  const model = new ChatGoogleGenerativeAI({
    model: "gemini-2.5-pro",
    temperature: 0.7
  });

  const prompt = `你是一名专业作家。
    任务: ${state.task}
    研究: ${state.researchData}
    分析: ${state.analysis}

    创建一个全面、结构良好的回应。`;

  const response = await model.invoke(prompt);

  return {
    messages: [response],
    finalOutput: response.content as string
  };
};

每个工作者智能体都专门处理特定的任务,并配有适当的模型设置和工具。

3. 实现监督者

// app/lib/agents/supervisor.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { JsonOutputParser } from "@langchain/core/output_parsers";
import { AgentStateType } from "./types";
import { END } from "@langchain/langgraph";

export const supervisorAgent = async (state: AgentStateType) => {
  const model = new ChatGoogleGenerativeAI({
    model: "gemini-2.5-flash",
    temperature: 0
  });

  // 根据当前状态确定下一个智能体
  const systemPrompt = `你是管理智能体团队的监督者。
    当前任务: ${state.task}

    可用智能体:
    - researcher: 收集信息
    - analyst: 分析数据
    - writer: 创建最终输出

    基于已完成的内容:
    - 研究数据存在: ${!!state.researchData}
    - 分析存在: ${!!state.analysis}
    - 最终输出存在: ${!!state.finalOutput}

    返回JSON: { "nextAgent": "agent_name" 或 "finish" }`;

  const parser = new JsonOutputParser();
  const response = await model.invoke(systemPrompt);
  const decision = await parser.parse(response.content as string);

  return {
    nextAgent: decision.nextAgent === "finish" ? END : decision.nextAgent,
    messages: [response]
  };
};

监督者根据当前状态决定下一个要激活的智能体。

4. 构建多智能体图

// app/lib/agents/workflow.ts
import { StateGraph } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph";
import { START, END } from "@langchain/langgraph";
import { AgentState } from "./types";
import { supervisorAgent } from "./supervisor";
import { researchAgent, analysisAgent, writerAgent } from "./workers";

export function createMultiAgentWorkflow() {
  const workflow = new StateGraph(AgentState)
    // 为每个智能体添加节点
    .addNode("supervisor", supervisorAgent)
    .addNode("researcher", researchAgent)
    .addNode("analyst", analysisAgent)
    .addNode("writer", writerAgent)

    // 定义流程
    .addEdge(START, "supervisor")
    .addConditionalEdges(
      "supervisor",
      // 基于监督者决策的路由函数
      (state) => state.nextAgent || END,
      {
        researcher: "researcher",
        analyst: "analyst",
        writer: "writer",
        [END]: END
      }
    )

    // 工作者返回到监督者
    .addEdge("researcher", "supervisor")
    .addEdge("analyst", "supervisor")
    .addEdge("writer", "supervisor");

  // 使用内存编译以实现对话持久化
  const memory = new MemorySaver();
  return workflow.compile({
    checkpointer: memory,
    recursionLimit: 10 // 防止无限循环
  });
}

LangGraph工作流通过适当的状态管理编排智能体交互。

5. 创建API路由

// app/api/multi-agent/route.ts
import { NextRequest, NextResponse } from "next/server";
import { createMultiAgentWorkflow } from "@/lib/agents/workflow";
import { HumanMessage } from "@langchain/core/messages";
import { pick } from "es-toolkit";

export const maxDuration = 300; // 长时间运行任务的5分钟

export async function POST(req: NextRequest) {
  try {
    const { task, threadId } = await req.json();

    // 初始化工作流
    const workflow = createMultiAgentWorkflow();

    // 执行多智能体协作
    const result = await workflow.invoke(
      {
        task,
        messages: [new HumanMessage(task)]
      },
      {
        configurable: { thread_id: threadId || "default" },
        recursionLimit: 10
      }
    );

    // 只返回相关字段
    const response = pick(result, ["finalOutput", "analysis", "researchData"]);

    return NextResponse.json({
      success: true,
      result: response
    });
  } catch (error) {
    console.error("多智能体错误:", error);
    return NextResponse.json(
      { error: "处理失败" },
      { status: 500 }
    );
  }
}

API路由使用Vercel的扩展超时支持处理多智能体编排。

6. 使用React Query的前端集成

// app/components/MultiAgentInterface.tsx
"use client";

import { useMutation } from "@tanstack/react-query";
import { useState } from "react";
import { debounce } from "es-toolkit";

interface MultiAgentResult {
  finalOutput: string;
  analysis: string;
  researchData: string;
}

async function runMultiAgentTask(task: string): Promise<MultiAgentResult> {
  const response = await fetch("/api/multi-agent", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ task, threadId: Date.now().toString() })
  });

  if (!response.ok) throw new Error("智能体执行失败");
  const data = await response.json();
  return data.result;
}

export default function MultiAgentInterface() {
  const [task, setTask] = useState("");

  const mutation = useMutation({
    mutationFn: runMultiAgentTask,
    onSuccess: (data) => {
      console.log("任务完成:", data);
    }
  });

  const handleSubmit = debounce((e: React.FormEvent) => {
    e.preventDefault();
    if (task.trim()) {
      mutation.mutate(task);
    }
  }, 500);

  return (
    <div className="card bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">多智能体任务处理器</h2>

        <form onSubmit={handleSubmit}>
          <textarea
            className="textarea textarea-bordered w-full"
            placeholder="描述你的任务..."
            value={task}
            onChange={(e) => setTask(e.target.value)}
            rows={4}
          />

          <button
            type="submit"
            className="btn btn-primary mt-4"
            disabled={mutation.isPending}
          >
            {mutation.isPending ? (
              <span className="loading loading-spinner" />
            ) : (
              "处理任务"
            )}
          </button>
        </form>

        {mutation.data && (
          <div className="mt-6 space-y-4">
            <div className="collapse collapse-arrow bg-base-200">
              <input type="checkbox" defaultChecked />
              <div className="collapse-title font-medium">
                最终输出
              </div>
              <div className="collapse-content">
                <p>{mutation.data.finalOutput}</p>
              </div>
            </div>

            <div className="collapse collapse-arrow bg-base-200">
              <input type="checkbox" />
              <div className="collapse-title font-medium">
                分析
              </div>
              <div className="collapse-content">
                <p>{mutation.data.analysis}</p>
              </div>
            </div>
          </div>
        )}
      </div>
    </div>
  );
}

React组件通过适当的加载状态和结果显示管理多智能体交互。

高级示例:带有辩论的并行协作

现在让我们实现一个复杂的多智能体系统,智能体们并行工作并通过辩论达成共识。

1. 定义带有投票的高级状态

// app/lib/agents/advanced-types.ts
import { Annotation } from "@langchain/langgraph";
import { z } from "zod";

export const DebateOpinion = z.object({
  agent: z.string(),
  opinion: z.string(),
  confidence: z.number().min(0).max(1),
  reasoning: z.string()
});

export const DebateState = Annotation.Root({
  question: Annotation<string>(),
  round: Annotation<number>({ default: () => 0 }),
  opinions: Annotation<z.infer<typeof DebateOpinion>[]>({
    reducer: (x, y) => [...x, ...y],
    default: () => []
  }),
  consensus: Annotation<string>(),
  hasConverged: Annotation<boolean>({ default: () => false })
});

export type DebateStateType = typeof DebateState.State;

高级状态跟踪意见、置信度分数和共识状态。

2. 实现并行专家智能体

// app/lib/agents/expert-agents.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { DebateStateType, DebateOpinion } from "./advanced-types";
import { chunk, groupBy, maxBy } from "es-toolkit";

// 基础专家智能体工厂
function createExpertAgent(expertise: string, model: string = "gemini-2.5-flash") {
  return async (state: DebateStateType): Promise<Partial<DebateStateType>> => {
    const llm = new ChatGoogleGenerativeAI({
      model,
      temperature: 0.7,
      maxRetries: 2
    });

    // 从上一轮获取其他智能体的意见
    const otherOpinions = state.opinions
      .filter(op => op.agent !== expertise)
      .slice(-3); // 其他人的最后3个意见

    const prompt = `你是${expertise}方面的专家。
      问题: ${state.question}
      轮次: ${state.round}

      ${otherOpinions.length > 0 ? `
      其他专家的意见:
      ${otherOpinions.map(op =>
        `${op.agent}: ${op.opinion} (置信度: ${op.confidence})`
      ).join('\n')}
      ` : ''}

      请提供你的意见,包括理由和置信度(0-1)。
      格式: { "opinion": "...", "reasoning": "...", "confidence": 0.8 }`;

    const response = await llm.invoke(prompt);
    const parsed = JSON.parse(response.content as string);

    return {
      opinions: [{
        agent: expertise,
        opinion: parsed.opinion,
        reasoning: parsed.reasoning,
        confidence: parsed.confidence
      }]
    };
  };
}

// 创建专门的专家
export const technicalExpert = createExpertAgent("技术分析");
export const businessExpert = createExpertAgent("业务策略");
export const userExpert = createExpertAgent("用户体验");
export const securityExpert = createExpertAgent("安全", "gemini-2.5-pro");

专家智能体提供带有置信度分数的专业观点。

3. 并行执行协调器

// app/lib/agents/parallel-coordinator.ts
import { DebateStateType } from "./advanced-types";
import {
  technicalExpert,
  businessExpert,
  userExpert,
  securityExpert
} from "./expert-agents";
import { chunk, meanBy, partition } from "es-toolkit";

export async function parallelExpertAnalysis(
  state: DebateStateType
): Promise<Partial<DebateStateType>> {
  const experts = [
    technicalExpert,
    businessExpert,
    userExpert,
    securityExpert
  ];

  // 带超时并行执行所有专家
  const expertPromises = experts.map(expert =>
    Promise.race([
      expert(state),
      new Promise<Partial<DebateStateType>>((_, reject) =>
        setTimeout(() => reject(new Error("超时")), 10000)
      )
    ])
  );

  const results = await Promise.allSettled(expertPromises);

  // 收集成功的意见
  const newOpinions = results
    .filter(r => r.status === "fulfilled")
    .flatMap(r => (r as PromiseFulfilledResult<any>).value.opinions || []);

  // 检查收敛
  const hasConverged = checkConvergence(newOpinions);

  return {
    opinions: newOpinions,
    round: state.round + 1,
    hasConverged
  };
}

function checkConvergence(opinions: any[]): boolean {
  if (opinions.length < 3) return false;

  // 按相似性分组意见(简化)
  const groups = groupBy(opinions, op =>
    op.opinion.toLowerCase().includes("同意") ? "同意" : "不同意"
  );

  // 检查多数是否同意
  const largestGroup = Object.values(groups)
    .reduce((a, b) => a.length > b.length ? a : b, []);

  // 如果>75%同意且平均置信度>0.7则收敛
  const convergenceRatio = largestGroup.length / opinions.length;
  const avgConfidence = meanBy(largestGroup, op => op.confidence);

  return convergenceRatio > 0.75 && avgConfidence > 0.7;
}

协调器管理带有超时处理和收敛检测的并行执行。

4. 通过投票构建共识

// app/lib/agents/consensus.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { DebateStateType } from "./advanced-types";
import { groupBy, maxBy, sortBy } from "es-toolkit";

export async function buildConsensus(
  state: DebateStateType
): Promise<Partial<DebateStateType>> {
  const model = new ChatGoogleGenerativeAI({
    model: "gemini-2.5-pro",
    temperature: 0.3
  });

  // 获取每个智能体的最新意见
  const recentOpinions = Object.values(
    groupBy(state.opinions, op => op.agent)
  ).map(group => maxBy(group, op => op.confidence));

  // 按置信度排序
  const sortedOpinions = sortBy(
    recentOpinions,
    op => -(op?.confidence || 0)
  );

  const prompt = `将这些专家意见综合成共识:
    问题: ${state.question}

    专家意见:
    ${sortedOpinions.map(op => `
      ${op?.agent} (置信度: ${op?.confidence}):
      意见: ${op?.opinion}
      理由: ${op?.reasoning}
    `).join('\n')}

    创建一个平衡的共识,涵盖所有专家的关键点。
    根据置信度分数对意见进行加权。`;

  const response = await model.invoke(prompt);

  return {
    consensus: response.content as string,
    hasConverged: true
  };
}

// 死锁解决的投票机制
export function majorityVote(opinions: any[]): string {
  const voteGroups = groupBy(opinions, op => op.opinion);
  const weightedVotes = Object.entries(voteGroups).map(([opinion, votes]) => ({
    opinion,
    totalWeight: votes.reduce((sum, v) => sum + v.confidence, 0)
  }));

  const winner = maxBy(weightedVotes, v => v.totalWeight);
  return winner?.opinion || "未达成共识";
}

共识构建通过置信度加权综合不同意见。

5. 完整的辩论工作流

// app/lib/agents/debate-workflow.ts
import { StateGraph, START, END } from "@langchain/langgraph";
import { DebateState } from "./advanced-types";
import { parallelExpertAnalysis } from "./parallel-coordinator";
import { buildConsensus } from "./consensus";

export function createDebateWorkflow() {
  const MAX_ROUNDS = 5;

  const workflow = new StateGraph(DebateState)
    .addNode("parallel_experts", parallelExpertAnalysis)
    .addNode("consensus", buildConsensus)

    // 从并行专家分析开始
    .addEdge(START, "parallel_experts")

    // 基于收敛的条件路由
    .addConditionalEdges(
      "parallel_experts",
      (state) => {
        if (state.hasConverged || state.round >= MAX_ROUNDS) {
          return "consensus";
        }
        return "parallel_experts"; // 继续辩论
      },
      {
        parallel_experts: "parallel_experts",
        consensus: "consensus"
      }
    )

    .addEdge("consensus", END);

  return workflow.compile({
    recursionLimit: MAX_ROUNDS * 2
  });
}

工作流实现带有自动收敛检测的迭代辩论。

6. 实时更新的流式API

// app/api/debate/route.ts
import { NextRequest } from "next/server";
import { createDebateWorkflow } from "@/lib/agents/debate-workflow";

export const maxDuration = 300;

export async function POST(req: NextRequest) {
  const { question } = await req.json();

  const encoder = new TextEncoder();
  const stream = new ReadableStream({
    async start(controller) {
      const workflow = createDebateWorkflow();

      try {
        // 事件发生时进行流式传输
        for await (const event of workflow.stream(
          { question, round: 0 },
          { streamMode: "values" }
        )) {
          const data = JSON.stringify({
            type: "update",
            round: event.round,
            opinions: event.opinions?.slice(-4), // 最后4个意见
            hasConverged: event.hasConverged
          });

          controller.enqueue(
            encoder.encode(`data: ${data}\n\n`)
          );
        }

        // 发送最终共识
        const final = await workflow.invoke({ question, round: 0 });
        controller.enqueue(
          encoder.encode(`data: ${JSON.stringify({
            type: "complete",
            consensus: final.consensus
          })}\n\n`)
        );
      } catch (error) {
        controller.enqueue(
          encoder.encode(`data: ${JSON.stringify({
            type: "error",
            message: "辩论失败"
          })}\n\n`)
        );
      } finally {
        controller.close();
      }
    }
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      "Connection": "keep-alive"
    }
  });
}

流式API在辩论过程中提供实时更新。

7. 实时辩论可视化

// app/components/DebateVisualization.tsx
"use client";

import { useEffect, useState } from "react";
import { groupBy, sortBy } from "es-toolkit";

interface Opinion {
  agent: string;
  opinion: string;
  confidence: number;
  reasoning: string;
}

export default function DebateVisualization() {
  const [question, setQuestion] = useState("");
  const [isDebating, setIsDebating] = useState(false);
  const [rounds, setRounds] = useState<Opinion[][]>([]);
  const [consensus, setConsensus] = useState<string>("");

  const startDebate = async () => {
    setIsDebating(true);
    setRounds([]);
    setConsensus("");

    const response = await fetch("/api/debate", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ question })
    });

    const reader = response.body?.getReader();
    const decoder = new TextDecoder();

    while (reader) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value);
      const lines = chunk.split('\n');

      for (const line of lines) {
        if (line.startsWith('data: ')) {
          const data = JSON.parse(line.slice(6));

          if (data.type === "update" && data.opinions) {
            setRounds(prev => [...prev, data.opinions]);
          } else if (data.type === "complete") {
            setConsensus(data.consensus);
            setIsDebating(false);
          }
        }
      }
    }
  };

  return (
    <div className="container mx-auto p-4">
      <div className="card bg-base-100 shadow-xl mb-6">
        <div className="card-body">
          <h2 className="card-title">多智能体辩论系统</h2>

          <input
            type="text"
            placeholder="输入辩论问题..."
            className="input input-bordered w-full"
            value={question}
            onChange={(e) => setQuestion(e.target.value)}
          />

          <button
            className="btn btn-primary"
            onClick={startDebate}
            disabled={isDebating || !question}
          >
            {isDebating ? (
              <>
                <span className="loading loading-spinner" />
                辩论中...
              </>
            ) : (
              "开始辩论"
            )}
          </button>
        </div>
      </div>

      {/* 辩论轮次可视化 */}
      {rounds.length > 0 && (
        <div className="grid grid-cols-1 md:grid-cols-2 gap-4 mb-6">
          {rounds[rounds.length - 1].map((opinion, idx) => (
            <div key={idx} className="card bg-base-200">
              <div className="card-body">
                <div className="flex justify-between items-center">
                  <h3 className="font-bold">{opinion.agent}</h3>
                  <div className="badge badge-primary">
                    {Math.round(opinion.confidence * 100)}% 置信度
                  </div>
                </div>
                <p className="text-sm mt-2">{opinion.opinion}</p>
                <progress
                  className="progress progress-primary w-full"
                  value={opinion.confidence}
                  max="1"
                />
              </div>
            </div>
          ))}
        </div>
      )}

      {/* 共识结果 */}
      {consensus && (
        <div className="alert alert-success">
          <div>
            <h3 className="font-bold">达成共识:</h3>
            <p>{consensus}</p>
          </div>
        </div>
      )}
    </div>
  );
}

实时可视化展示带有置信度指标的辩论进展。

结论

多智能体协作通过在专门的智能体之间分配任务来转变复杂的问题解决。监督者-工作者模式为客户服务和研究任务提供了理想的集中控制,而并行辩论系统在为关键决策收集不同观点方面表现出色。关键考虑因素包括管理协调开销(保持团队少于7个智能体)、为无服务器环境实施适当的超时处理,以及使用置信度加权投票进行共识构建。通过LangGraph的状态编排和Vercel的无服务器基础设施,您可以构建生产就绪的多智能体系统,通过复杂的协调模式在保持质量的同时实现2-10倍的生产力提升。