草案 智能体设计模式 - 推理技术

agentic-designreasoninglangchainlanggraphaitypescript
By sko X opus 4.19/21/202516 min read

学习如何使用 LangChain、LangGraph 和 TypeScript 在无服务器环境中实现高级推理技术,包括思维链(CoT)、思维树(ToT)、思维图(GoT)和自我反思模式。

心智模型:交响乐团

将推理技术想象成一个交响乐团,不同的乐器组(推理模式)共同协作创造复杂的和声(解决方案)。指挥(LangGraph)协调多个乐器(智能体)——弦乐处理线性推理(CoT),木管乐器探索变化(ToT),铜管乐器提供反馈(自我反思),打击乐器维持节奏(状态管理)。就像音乐家可以独奏或合奏一样,智能体可以独立推理或协作以获得更丰富的结果。

基础示例:思维链推理

1. 设置基础 CoT 智能体

// app/agents/cot-agent/route.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { StateGraph, Annotation } from "@langchain/langgraph";
import { HumanMessage, AIMessage, BaseMessage } from "@langchain/core/messages";
import { z } from "zod";
import { map, reduce } from "es-toolkit";

// 定义推理状态
const ReasoningState = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: (x, y) => x.concat(y),
    default: () => [],
  }),
  reasoning_steps: Annotation<string[]>({
    reducer: (x, y) => x.concat(y),
    default: () => [],
  }),
  final_answer: Annotation<string>({
    reducer: (_, y) => y,
    default: () => "",
  }),
});

// 创建推理链
const model = new ChatGoogleGenerativeAI({
  modelName: "gemini-pro",
  temperature: 0.7,
});

// CoT 推理节点
async function reasoningNode(state: typeof ReasoningState.State) {
  const cotPrompt = `你是一个推理智能体。请逐步分解问题。

问题:${state.messages[state.messages.length - 1].content}

请按步骤思考:
1. 首先,识别我们要解决的问题
2. 将其分解成更小的部分
3. 解决每个部分
4. 组合解决方案

请按以下格式回复:
步骤 1:[推理]
步骤 2:[推理]
...
最终答案:[答案]`;

  const response = await model.invoke([
    new HumanMessage(cotPrompt)
  ]);

  // 解析响应
  const content = response.content as string;
  const steps = content.match(/步骤 \d+:(.+)/g) || [];
  const finalAnswer = content.match(/最终答案:(.+)/)?.[1] || "";

  return {
    messages: [response],
    reasoning_steps: steps,
    final_answer: finalAnswer,
  };
}

// 构建图
const workflow = new StateGraph(ReasoningState)
  .addNode("reason", reasoningNode)
  .addEdge("__start__", "reason")
  .addEdge("reason", "__end__");

const app = workflow.compile();

// API 路由处理器
export async function POST(request: Request) {
  const { question } = await request.json();

  const result = await app.invoke({
    messages: [new HumanMessage(question)],
  });

  return Response.json({
    reasoning_steps: result.reasoning_steps,
    final_answer: result.final_answer,
  });
}

基础 CoT 智能体将问题分解成顺序推理步骤。每个步骤都建立在前一个步骤的基础上,创建一个逻辑思维链。

2. 使用 React Query 进行前端集成

// app/components/ReasoningAgent.tsx
'use client';

import { useMutation } from '@tanstack/react-query';
import { useState } from 'react';
import { map } from 'es-toolkit';

interface ReasoningResponse {
  reasoning_steps: string[];
  final_answer: string;
}

async function queryReasoning(question: string): Promise<ReasoningResponse> {
  const response = await fetch('/agents/cot-agent', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ question }),
  });
  return response.json();
}

export function ReasoningAgent() {
  const [question, setQuestion] = useState('');

  const mutation = useMutation({
    mutationFn: queryReasoning,
  });

  return (
    <div className="card bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">思维链推理</h2>

        <textarea
          className="textarea textarea-bordered"
          placeholder="提出一个复杂的问题..."
          value={question}
          onChange={(e) => setQuestion(e.target.value)}
        />

        <button
          className="btn btn-primary"
          onClick={() => mutation.mutate(question)}
          disabled={mutation.isPending}
        >
          {mutation.isPending ? '推理中...' : '开始推理'}
        </button>

        {mutation.isSuccess && (
          <div className="mt-4 space-y-2">
            <div className="text-sm opacity-70">推理步骤:</div>
            {map(mutation.data.reasoning_steps, (step, idx) => (
              <div key={idx} className="p-2 bg-base-200 rounded">
                {step}
              </div>
            ))}
            <div className="divider"></div>
            <div className="alert alert-success">
              <span>{mutation.data.final_answer}</span>
            </div>
          </div>
        )}
      </div>
    </div>
  );
}

React Query 处理异步状态管理,而 UI 逐步显示推理步骤。

高级示例:使用 ToT 和自我反思的多智能体推理

1. 并行探索的思维树

// app/agents/advanced-reasoning/tree-of-thought.ts
import { StateGraph, Annotation, Send } from "@langchain/langgraph";
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { BaseMessage, HumanMessage, AIMessage } from "@langchain/core/messages";
import { z } from "zod";
import { maxBy, sortBy, filter, map, chunk } from "es-toolkit";
import { RunnableConfig } from "@langchain/core/runnables";

// 定义思维节点结构
const ThoughtSchema = z.object({
  id: z.string(),
  content: z.string(),
  score: z.number(),
  parent_id: z.string().optional(),
  depth: z.number(),
});

type Thought = z.infer<typeof ThoughtSchema>;

// 定义带有思维树的状态
const ToTState = Annotation.Root({
  problem: Annotation<string>(),
  thoughts: Annotation<Thought[]>({
    reducer: (x, y) => [...x, ...y],
    default: () => [],
  }),
  current_depth: Annotation<number>({
    reducer: (_, y) => y,
    default: () => 0,
  }),
  best_solution: Annotation<string>({
    reducer: (_, y) => y,
    default: () => "",
  }),
  exploration_paths: Annotation<string[][]>({
    reducer: (x, y) => [...x, ...y],
    default: () => [],
  }),
});

const model = new ChatGoogleGenerativeAI({
  modelName: "gemini-pro",
  temperature: 0.8,
});

// 并行生成多个思维
async function generateThoughts(
  state: typeof ToTState.State,
  config: RunnableConfig
): Promise<Partial<typeof ToTState.State>> {
  const parentThoughts = filter(
    state.thoughts,
    (t) => t.depth === state.current_depth - 1
  );

  // 如果没有父思维,使用原始问题
  const prompts = parentThoughts.length > 0
    ? map(parentThoughts, (parent) => ({
        parent_id: parent.id,
        prompt: `基于这个推理步骤:"${parent.content}"
        生成 3 种不同的方法来解决:${state.problem}
        要有创意并探索不同的方法。`,
      }))
    : [{
        parent_id: "root",
        prompt: `问题:${state.problem}
        生成 3 种不同的初始方法来解决这个问题。`,
      }];

  // 并行生成思维
  const thoughtPromises = map(prompts, async ({ parent_id, prompt }) => {
    const response = await model.invoke([new HumanMessage(prompt)]);
    const content = response.content as string;

    // 从响应中解析多个思维
    const thoughtTexts = content.split('\n').filter(line => line.trim());

    return map(thoughtTexts.slice(0, 3), (text, idx) => ({
      id: `${parent_id}-${state.current_depth}-${idx}`,
      content: text,
      score: 0,
      parent_id,
      depth: state.current_depth,
    }));
  });

  const allThoughts = (await Promise.all(thoughtPromises)).flat();

  return {
    thoughts: allThoughts,
  };
}

// 根据质量对思维评分
async function scoreThoughts(
  state: typeof ToTState.State
): Promise<Partial<typeof ToTState.State>> {
  const currentThoughts = filter(
    state.thoughts,
    (t) => t.depth === state.current_depth && t.score === 0
  );

  const scoringPrompt = `评价这些解决方案方法来解决问题:"${state.problem}"
  基于以下标准给出 0-10 分:
  - 逻辑连贯性
  - 解决方案的进展
  - 创造性

  方法:
  ${map(currentThoughts, (t, i) => `${i + 1}. ${t.content}`).join('\n')}

  返回分数格式:[score1, score2, ...]`;

  const response = await model.invoke([new HumanMessage(scoringPrompt)]);
  const scores = JSON.parse((response.content as string).match(/\[[\d,\s]+\]/)?.[0] || "[]");

  const scoredThoughts = map(currentThoughts, (thought, idx) => ({
    ...thought,
    score: scores[idx] || 0,
  }));

  // 只更新评分的思维
  const updatedThoughts = map(state.thoughts, (t) => {
    const scored = scoredThoughts.find(st => st.id === t.id);
    return scored || t;
  });

  return {
    thoughts: updatedThoughts,
  };
}

// 修剪低分分支
async function pruneThoughts(
  state: typeof ToTState.State
): Promise<Partial<typeof ToTState.State>> {
  const currentThoughts = filter(
    state.thoughts,
    (t) => t.depth === state.current_depth
  );

  // 保留前 40% 的思维
  const sorted = sortBy(currentThoughts, [(t) => -t.score]);
  const keepCount = Math.max(2, Math.floor(sorted.length * 0.4));
  const keepIds = new Set(map(sorted.slice(0, keepCount), t => t.id));

  const prunedThoughts = filter(state.thoughts, (t) =>
    t.depth < state.current_depth || keepIds.has(t.id)
  );

  return {
    thoughts: prunedThoughts,
  };
}

// 检查是否应该继续探索
function shouldContinue(state: typeof ToTState.State): "expand" | "synthesize" {
  const MAX_DEPTH = 3;
  return state.current_depth < MAX_DEPTH ? "expand" : "synthesize";
}

// 从最佳路径合成最终解决方案
async function synthesizeSolution(
  state: typeof ToTState.State
): Promise<Partial<typeof ToTState.State>> {
  // 找到最佳叶节点
  const leafThoughts = filter(
    state.thoughts,
    (t) => t.depth === state.current_depth
  );

  const bestLeaf = maxBy(leafThoughts, (t) => t.score);

  if (!bestLeaf) {
    return { best_solution: "未找到解决方案" };
  }

  // 追溯到根节点
  const path: Thought[] = [];
  let current: Thought | undefined = bestLeaf;

  while (current) {
    path.unshift(current);
    current = state.thoughts.find(t => t.id === current?.parent_id);
  }

  const synthesisPrompt = `从这个推理路径合成一个完整的解决方案:
  ${map(path, (t, i) => `步骤 ${i + 1}:${t.content}`).join('\n')}

  为 ${state.problem} 提供清晰、全面的答案`;

  const response = await model.invoke([new HumanMessage(synthesisPrompt)]);

  return {
    best_solution: response.content as string,
    exploration_paths: [map(path, t => t.content)],
  };
}

// 构建思维树工作流
export function createToTWorkflow() {
  const workflow = new StateGraph(ToTState)
    .addNode("generate", generateThoughts)
    .addNode("score", scoreThoughts)
    .addNode("prune", pruneThoughts)
    .addNode("synthesize", synthesizeSolution)
    .addEdge("__start__", "generate")
    .addEdge("generate", "score")
    .addEdge("score", "prune")
    .addConditionalEdges(
      "prune",
      shouldContinue,
      {
        expand: "generate",
        synthesize: "synthesize",
      }
    )
    .addEdge("synthesize", "__end__");

  return workflow.compile();
}

思维树并行探索多条推理路径,通过评分和修剪来找到最优解决方案。

2. 用于解决方案改进的自我反思智能体

// app/agents/advanced-reasoning/self-reflection.ts
import { StateGraph, Annotation } from "@langchain/langgraph";
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { BaseMessage, HumanMessage } from "@langchain/core/messages";
import { z } from "zod";
import { reduce, filter, last } from "es-toolkit";

// 带有批评历史的反思状态
const ReflectionState = Annotation.Root({
  problem: Annotation<string>(),
  current_solution: Annotation<string>({
    reducer: (_, y) => y,
  }),
  critiques: Annotation<Array<{
    critique: string;
    suggestions: string[];
    score: number;
  }>>({
    reducer: (x, y) => [...x, ...y],
    default: () => [],
  }),
  iteration: Annotation<number>({
    reducer: (_, y) => y,
    default: () => 0,
  }),
  final_solution: Annotation<string>({
    reducer: (_, y) => y,
    default: () => "",
  }),
});

const model = new ChatGoogleGenerativeAI({
  modelName: "gemini-pro",
  temperature: 0.7,
});

// 行动者:生成初始解决方案
async function generateSolution(
  state: typeof ReflectionState.State
): Promise<Partial<typeof ReflectionState.State>> {
  const lastCritique = last(state.critiques);

  const prompt = lastCritique
    ? `问题:${state.problem}

      之前的解决方案:${state.current_solution}

      批评:${lastCritique.critique}
      建议:${lastCritique.suggestions.join(', ')}

      生成一个改进的解决方案来解决这些问题。`
    : `解决这个问题:${state.problem}

      提供一个详细的解决方案,带有清晰的推理。`;

  const response = await model.invoke([new HumanMessage(prompt)]);

  return {
    current_solution: response.content as string,
    iteration: state.iteration + 1,
  };
}

// 评估者:批评解决方案
async function evaluateSolution(
  state: typeof ReflectionState.State
): Promise<Partial<typeof ReflectionState.State>> {
  const critiquePrompt = `批判性地评估这个解决方案:

  问题:${state.problem}
  解决方案:${state.current_solution}

  提供:
  1. 识别弱点的详细批评
  2. 改进的具体建议
  3. 0-10 分的评分

  JSON 格式:{
    "critique": "...",
    "suggestions": ["...", "..."],
    "score": 0
  }`;

  const response = await model.invoke([new HumanMessage(critiquePrompt)]);

  try {
    const evaluation = JSON.parse(response.content as string);

    return {
      critiques: [evaluation],
    };
  } catch {
    // JSON 解析失败时的后备
    return {
      critiques: [{
        critique: response.content as string,
        suggestions: [],
        score: 5,
      }],
    };
  }
}

// 检查解决方案是否足够好
function shouldContinueReflection(
  state: typeof ReflectionState.State
): "reflect" | "finalize" {
  const MAX_ITERATIONS = 3;
  const GOOD_SCORE = 8;

  const lastCritique = last(state.critiques);

  if (state.iteration >= MAX_ITERATIONS) {
    return "finalize";
  }

  if (lastCritique && lastCritique.score >= GOOD_SCORE) {
    return "finalize";
  }

  return "reflect";
}

// 最终确定最佳解决方案
async function finalizeSolution(
  state: typeof ReflectionState.State
): Promise<Partial<typeof ReflectionState.State>> {
  return {
    final_solution: state.current_solution,
  };
}

// 构建自我反思工作流
export function createReflectionWorkflow() {
  const workflow = new StateGraph(ReflectionState)
    .addNode("generate", generateSolution)
    .addNode("evaluate", evaluateSolution)
    .addNode("finalize", finalizeSolution)
    .addEdge("__start__", "generate")
    .addEdge("generate", "evaluate")
    .addConditionalEdges(
      "evaluate",
      shouldContinueReflection,
      {
        reflect: "generate",
        finalize: "finalize",
      }
    )
    .addEdge("finalize", "__end__");

  return workflow.compile();
}

自我反思通过批评和改进循环迭代地改进解决方案。

3. 协调多种推理模式

// app/agents/advanced-reasoning/orchestrator.ts
import { StateGraph, Annotation, Send } from "@langchain/langgraph";
import { createToTWorkflow } from "./tree-of-thought";
import { createReflectionWorkflow } from "./self-reflection";
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { HumanMessage } from "@langchain/core/messages";
import { z } from "zod";
import { maxBy, map } from "es-toolkit";

// 主协调器状态
const OrchestratorState = Annotation.Root({
  problem: Annotation<string>(),
  problem_type: Annotation<string>({
    reducer: (_, y) => y,
    default: () => "unknown",
  }),
  tot_result: Annotation<{
    solution: string;
    paths: string[][];
  }>(),
  reflection_result: Annotation<{
    solution: string;
    iterations: number;
  }>(),
  final_answer: Annotation<string>({
    reducer: (_, y) => y,
    default: () => "",
  }),
  reasoning_method: Annotation<string>({
    reducer: (_, y) => y,
    default: () => "",
  }),
});

const model = new ChatGoogleGenerativeAI({
  modelName: "gemini-pro",
  temperature: 0.3,
});

// 分类问题以选择推理策略
async function classifyProblem(
  state: typeof OrchestratorState.State
): Promise<Partial<typeof OrchestratorState.State>> {
  const classificationPrompt = `分类这个问题以确定最佳推理方法:

  问题:${state.problem}

  类别:
  - "exploratory":需要多个解决路径的问题(使用思维树)
  - "refinement":需要迭代改进的问题(使用自我反思)
  - "hybrid":需要探索和改进的复杂问题

  只返回类别名称。`;

  const response = await model.invoke([new HumanMessage(classificationPrompt)]);
  const problemType = (response.content as string).toLowerCase().trim();

  return {
    problem_type: problemType,
  };
}

// 运行思维树推理
async function runToT(
  state: typeof OrchestratorState.State
): Promise<Partial<typeof OrchestratorState.State>> {
  const totWorkflow = createToTWorkflow();

  const result = await totWorkflow.invoke({
    problem: state.problem,
    thoughts: [],
    current_depth: 0,
  });

  return {
    tot_result: {
      solution: result.best_solution,
      paths: result.exploration_paths,
    },
    reasoning_method: "思维树",
  };
}

// 运行自我反思推理
async function runReflection(
  state: typeof OrchestratorState.State
): Promise<Partial<typeof OrchestratorState.State>> {
  const reflectionWorkflow = createReflectionWorkflow();

  const result = await reflectionWorkflow.invoke({
    problem: state.problem,
    current_solution: "",
    critiques: [],
    iteration: 0,
  });

  return {
    reflection_result: {
      solution: result.final_solution,
      iterations: result.iteration,
    },
    reasoning_method: "自我反思",
  };
}

// 合并多种推理方法的结果
async function synthesizeResults(
  state: typeof OrchestratorState.State
): Promise<Partial<typeof OrchestratorState.State>> {
  let solutions = [];

  if (state.tot_result) {
    solutions.push({
      method: "思维树",
      solution: state.tot_result.solution,
    });
  }

  if (state.reflection_result) {
    solutions.push({
      method: "自我反思",
      solution: state.reflection_result.solution,
    });
  }

  if (solutions.length === 1) {
    return {
      final_answer: solutions[0].solution,
    };
  }

  // 如果有多个解决方案,合成它们
  const synthesisPrompt = `将这些解决方案合成为一个综合答案:

  问题:${state.problem}

  ${map(solutions, s => `${s.method} 解决方案:\n${s.solution}`).join('\n\n')}

  创建一个结合每种方法最佳见解的最终答案。`;

  const response = await model.invoke([new HumanMessage(synthesisPrompt)]);

  return {
    final_answer: response.content as string,
    reasoning_method: "混合(ToT + 自我反思)",
  };
}

// 根据问题类型路由
function routeReasoning(state: typeof OrchestratorState.State): string | string[] {
  switch (state.problem_type) {
    case "exploratory":
      return "tot";
    case "refinement":
      return "reflection";
    case "hybrid":
      return ["tot", "reflection"];
    default:
      return "tot"; // 默认后备
  }
}

// 创建主协调器
export function createReasoningOrchestrator() {
  const workflow = new StateGraph(OrchestratorState)
    .addNode("classify", classifyProblem)
    .addNode("tot", runToT)
    .addNode("reflection", runReflection)
    .addNode("synthesize", synthesizeResults)
    .addEdge("__start__", "classify")
    .addConditionalEdges(
      "classify",
      routeReasoning,
      {
        tot: "tot",
        reflection: "reflection",
      }
    )
    .addEdge("tot", "synthesize")
    .addEdge("reflection", "synthesize")
    .addEdge("synthesize", "__end__");

  return workflow.compile();
}

// 带流式传输的 API 路由处理器
export async function POST(request: Request) {
  const { problem } = await request.json();

  const orchestrator = createReasoningOrchestrator();

  // 创建流式响应
  const encoder = new TextEncoder();
  const stream = new ReadableStream({
    async start(controller) {
      try {
        // 当事件发生时流式传输
        for await (const event of orchestrator.streamEvents(
          { problem },
          { version: "v2" }
        )) {
          if (event.event === "on_chain_stream") {
            const chunk = encoder.encode(
              `data: ${JSON.stringify({
                type: "progress",
                node: event.name,
                data: event.data,
              })}\n\n`
            );
            controller.enqueue(chunk);
          }
        }

        // 获取最终结果
        const result = await orchestrator.invoke({ problem });

        const finalChunk = encoder.encode(
          `data: ${JSON.stringify({
            type: "complete",
            answer: result.final_answer,
            method: result.reasoning_method,
            tot_result: result.tot_result,
            reflection_result: result.reflection_result,
          })}\n\n`
        );

        controller.enqueue(finalChunk);
        controller.close();
      } catch (error) {
        controller.error(error);
      }
    },
  });

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

协调器根据问题特征智能地选择和组合推理策略。

4. 带实时流式传输的前端

// app/components/AdvancedReasoningAgent.tsx
'use client';

import { useState, useCallback } from 'react';
import { useMutation } from '@tanstack/react-query';
import { map, groupBy } from 'es-toolkit';

interface StreamEvent {
  type: 'progress' | 'complete';
  node?: string;
  data?: any;
  answer?: string;
  method?: string;
  tot_result?: any;
  reflection_result?: any;
}

export function AdvancedReasoningAgent() {
  const [problem, setProblem] = useState('');
  const [events, setEvents] = useState<StreamEvent[]>([]);
  const [isStreaming, setIsStreaming] = useState(false);

  const startReasoning = useCallback(async () => {
    setIsStreaming(true);
    setEvents([]);

    try {
      const response = await fetch('/agents/advanced-reasoning/orchestrator', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ problem }),
      });

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      if (!reader) throw new Error('没有可用的阅读器');

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = JSON.parse(line.slice(6));
            setEvents(prev => [...prev, data]);
          }
        }
      }
    } catch (error) {
      console.error('流式传输错误:', error);
    } finally {
      setIsStreaming(false);
    }
  }, [problem]);

  const completeEvent = events.find(e => e.type === 'complete');
  const progressEvents = events.filter(e => e.type === 'progress');

  return (
    <div className="max-w-4xl mx-auto p-6 space-y-6">
      <div className="card bg-base-100 shadow-xl">
        <div className="card-body">
          <h2 className="card-title">高级多智能体推理</h2>

          <textarea
            className="textarea textarea-bordered h-32"
            placeholder="输入一个需要深度推理的复杂问题..."
            value={problem}
            onChange={(e) => setProblem(e.target.value)}
          />

          <button
            className={`btn btn-primary ${isStreaming ? 'loading' : ''}`}
            onClick={startReasoning}
            disabled={isStreaming || !problem}
          >
            {isStreaming ? '推理进行中...' : '开始高级推理'}
          </button>
        </div>
      </div>

      {progressEvents.length > 0 && (
        <div className="card bg-base-200">
          <div className="card-body">
            <h3 className="font-semibold">推理进度</h3>
            <div className="space-y-2">
              {map(progressEvents, (event, idx) => (
                <div key={idx} className="flex items-center gap-2">
                  <div className="badge badge-primary">{event.node}</div>
                  <span className="text-sm opacity-70">处理中...</span>
                </div>
              ))}
            </div>
          </div>
        </div>
      )}

      {completeEvent && (
        <div className="space-y-4">
          <div className="card bg-success text-success-content">
            <div className="card-body">
              <h3 className="card-title">最终答案</h3>
              <p className="whitespace-pre-wrap">{completeEvent.answer}</p>
              <div className="card-actions justify-end">
                <div className="badge badge-outline">
                  方法:{completeEvent.method}
                </div>
              </div>
            </div>
          </div>

          {completeEvent.tot_result && (
            <div className="card bg-base-100">
              <div className="card-body">
                <h4 className="font-semibold">思维树探索</h4>
                <div className="text-sm space-y-1">
                  {map(completeEvent.tot_result.paths[0], (step, idx) => (
                    <div key={idx} className="pl-4 border-l-2 border-primary">
                      步骤 {idx + 1}:{step}
                    </div>
                  ))}
                </div>
              </div>
            </div>
          )}

          {completeEvent.reflection_result && (
            <div className="card bg-base-100">
              <div className="card-body">
                <h4 className="font-semibold">自我反思过程</h4>
                <p className="text-sm opacity-70">
                  通过 {completeEvent.reflection_result.iterations} 次迭代改进
                </p>
              </div>
            </div>
          )}
        </div>
      )}
    </div>
  );
}

前端通过渐进式更新提供推理过程的实时反馈。

性能优化

// app/lib/reasoning-cache.ts
import { LRUCache } from 'lru-cache';
import { createHash } from 'crypto';

// 推理结果的语义缓存
const reasoningCache = new LRUCache<string, any>({
  max: 100,
  ttl: 1000 * 60 * 60, // 1 小时
  updateAgeOnGet: true,
});

export function getCacheKey(problem: string, method: string): string {
  return createHash('sha256')
    .update(`${problem}-${method}`)
    .digest('hex');
}

export async function withCache<T>(
  key: string,
  fn: () => Promise<T>
): Promise<T> {
  const cached = reasoningCache.get(key);
  if (cached) {
    console.log(`缓存命中键:${key}`);
    return cached;
  }

  const result = await fn();
  reasoningCache.set(key, result);
  return result;
}

// 并行处理助手
export async function parallelReasoning<T>(
  tasks: Array<() => Promise<T>>,
  maxConcurrency: number = 3
): Promise<T[]> {
  const results: T[] = [];
  const executing: Promise<void>[] = [];

  for (const task of tasks) {
    const promise = task().then(result => {
      results.push(result);
    });

    executing.push(promise);

    if (executing.length >= maxConcurrency) {
      await Promise.race(executing);
      executing.splice(executing.findIndex(p => p === promise), 1);
    }
  }

  await Promise.all(executing);
  return results;
}

缓存和并行处理显著减少延迟和成本。

结论

推理技术将智能体从简单的响应者转变为智能问题解决者。从基础的思维链开始处理线性问题,然后逐步添加思维树进行探索和自我反思进行改进。协调器模式能够根据问题特征动态选择推理策略。记得实现缓存、流式传输和并行处理以获得生产级性能。这些模式与 LangGraph 的状态管理和 TypeScript 的类型安全相结合,创建了能够在无服务器环境中解决复杂现实世界问题的强大推理系统。