草案 智能体设计模式 - 规划

人工智能智能体规划langchainlanggraphtypescript
By sko X opus 4.19/20/202521 min read

学习如何实现规划模式,使AI智能体能够分解复杂任务、创建多步骤策略,并使用TypeScript、LangChain、LangGraph和Google Gemini模型在Vercel无服务器平台上执行复杂的工作流程。

心智模型:建筑工地管理员

将规划智能体想象成监督建筑项目的建筑工地管理员。管理员不亲自砌砖或安装管道,而是制定全面的计划,将任务委派给专业工人,监控进度,并在出现问题时调整策略。同样,规划智能体将复杂问题分解为可管理的步骤,通过专业工具或子智能体编排执行,并根据中间结果调整计划。这种规划与执行的分离使得能够处理单次通过方法无法应对的复杂性。

基础示例:计划执行智能体

1. 定义智能体状态类型

// lib/planning/types.ts
import { z } from 'zod';
import { Annotation } from '@langchain/langgraph';
import { BaseMessage } from '@langchain/core/messages';

// 单个计划步骤的模式
export const PlanStepSchema = z.object({
  step: z.number(),
  action: z.string(),
  reasoning: z.string(),
  dependencies: z.array(z.number()).default([]),
  status: z.enum(['pending', 'in_progress', 'completed', 'failed']).default('pending'),
  result: z.string().optional(),
});

export type PlanStep = z.infer<typeof PlanStepSchema>;

// 使用LangGraph注解的状态定义
export const PlanningAgentState = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: (current, update) => current.concat(update),
    default: () => [],
  }),
  plan: Annotation<PlanStep[]>({
    reducer: (current, update) => update,
    default: () => [],
  }),
  currentStep: Annotation<number>({
    default: () => 0
  }),
  executionResults: Annotation<Record<number, any>>({
    reducer: (current, update) => ({ ...current, ...update }),
    default: () => ({}),
  }),
  finalOutput: Annotation<string>({
    default: () => ''
  }),
});

export type AgentState = typeof PlanningAgentState.State;

使用LangGraph的注解系统定义强类型状态结构,包含计划步骤、执行跟踪和消息历史,用于状态管理。

2. 创建规划节点

// lib/planning/nodes/planner.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { map, filter, sortBy } from 'es-toolkit';
import { AgentState, PlanStep } from '../types';

const plannerModel = new ChatGoogleGenerativeAI({
  model: 'gemini-2.5-pro',
  temperature: 0.1,
  maxOutputTokens: 8192,
}).withStructuredOutput({
  name: 'plan',
  description: 'Structured plan for task execution',
  parameters: {
    type: 'object',
    properties: {
      steps: {
        type: 'array',
        items: {
          type: 'object',
          properties: {
            step: { type: 'number' },
            action: { type: 'string' },
            reasoning: { type: 'string' },
            dependencies: {
              type: 'array',
              items: { type: 'number' }
            },
          },
          required: ['step', 'action', 'reasoning'],
        },
      },
    },
    required: ['steps'],
  },
});

export async function plannerNode(state: AgentState): Promise<Partial<AgentState>> {
  const userMessage = state.messages[state.messages.length - 1];

  const systemPrompt = `You are a planning agent. Break down the user's request into clear, actionable steps.
  Each step should:
  1. Have a clear action to perform
  2. Include reasoning for why this step is necessary
  3. List any dependencies on previous steps (using step numbers)

  Ensure steps are ordered logically with dependencies respected.`;

  const response = await plannerModel.invoke([
    new SystemMessage(systemPrompt),
    userMessage,
  ]);

  // 使用es-toolkit处理和验证步骤
  const processedSteps = map(
    response.steps,
    (step: any, index: number) => ({
      ...step,
      step: index + 1,
      status: 'pending' as const,
      dependencies: step.dependencies || [],
    })
  );

  // 按依赖关系排序以确保正确的执行顺序
  const sortedSteps = sortBy(
    processedSteps,
    [(step: PlanStep) => step.dependencies.length, 'step']
  );

  return {
    plan: sortedSteps,
    currentStep: 0,
  };
}

创建一个规划节点,使用Gemini 2.5 Pro进行高质量推理,分析用户请求并生成结构化、依赖感知的执行计划。

3. 创建执行节点

// lib/planning/nodes/executor.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { SystemMessage, HumanMessage } from '@langchain/core/messages';
import { filter, find, every, map as mapArray } from 'es-toolkit';
import { AgentState, PlanStep } from '../types';

const executorModel = new ChatGoogleGenerativeAI({
  model: 'gemini-2.5-flash',  // 更快、更便宜的执行模型
  temperature: 0,
  maxOutputTokens: 2048,
});

export async function executorNode(state: AgentState): Promise<Partial<AgentState>> {
  const { plan, currentStep, executionResults } = state;

  // 查找下一个可执行步骤
  const executableStep = find(plan, (step: PlanStep) => {
    // 步骤必须是待处理状态且所有依赖项已完成
    if (step.status !== 'pending') return false;

    const dependenciesMet = every(
      step.dependencies,
      (depId: number) => {
        const depStep = find(plan, (s: PlanStep) => s.step === depId);
        return depStep?.status === 'completed';
      }
    );

    return dependenciesMet;
  });

  if (!executableStep) {
    // 没有更多步骤要执行
    return {
      finalOutput: generateSummary(plan, executionResults),
    };
  }

  // 执行步骤
  const contextFromDeps = executableStep.dependencies
    .map(depId => {
      const result = executionResults[depId];
      return result ? `Step ${depId} result: ${result}` : '';
    })
    .filter(Boolean)
    .join('\n');

  const executionPrompt = `Execute the following action:
  Action: ${executableStep.action}
  Reasoning: ${executableStep.reasoning}
  ${contextFromDeps ? `\nContext from previous steps:\n${contextFromDeps}` : ''}

  Provide a concise result of the action execution.`;

  const result = await executorModel.invoke([
    new SystemMessage('You are an execution agent. Perform the requested action and return the result.'),
    new HumanMessage(executionPrompt),
  ]);

  // 使用es-toolkit更新计划和结果
  const updatedPlan = mapArray(plan, (step: PlanStep) =>
    step.step === executableStep.step
      ? { ...step, status: 'completed' as const, result: result.content as string }
      : step
  );

  return {
    plan: updatedPlan,
    executionResults: {
      [executableStep.step]: result.content,
    },
    currentStep: currentStep + 1,
  };
}

function generateSummary(plan: PlanStep[], results: Record<number, any>): string {
  const completedSteps = filter(plan, (s: PlanStep) => s.status === 'completed');
  return `Completed ${completedSteps.length} steps successfully.
Final results: ${JSON.stringify(results, null, 2)}`;
}

使用Gemini 2.5 Flash执行单个计划步骤以实现成本效率,管理依赖关系并为后续步骤积累结果。

4. 构建规划图

// lib/planning/graph.ts
import { StateGraph, END, MemorySaver } from '@langchain/langgraph';
import { AgentState } from './types';
import { plannerNode } from './nodes/planner';
import { executorNode } from './nodes/executor';
import { every } from 'es-toolkit';

export function createPlanningGraph() {
  const workflow = new StateGraph<AgentState>({
    stateSchema: AgentState,
  });

  // 添加节点
  workflow.addNode('planner', plannerNode);
  workflow.addNode('executor', executorNode);

  // 定义条件边
  workflow.addConditionalEdges(
    'executor',
    (state: AgentState) => {
      // 使用es-toolkit检查是否所有步骤都已完成
      const allCompleted = every(
        state.plan,
        step => step.status === 'completed' || step.status === 'failed'
      );

      return allCompleted ? 'end' : 'continue';
    },
    {
      end: END,
      continue: 'executor',
    }
  );

  // 设置流程
  workflow.setEntryPoint('planner');
  workflow.addEdge('planner', 'executor');

  return workflow.compile({
    checkpointer: new MemorySaver(), // 对于无服务器,在生产中使用外部存储
  });
}

组装具有条件执行逻辑的规划工作流,支持迭代步骤执行直到所有任务完成。

5. 规划智能体的API路由

// app/api/planning/route.ts
import { createPlanningGraph } from '@/lib/planning/graph';
import { HumanMessage } from '@langchain/core/messages';
import { NextResponse } from 'next/server';
import { v4 as uuidv4 } from 'uuid';

export const runtime = 'nodejs';
export const maxDuration = 300;

export async function POST(req: Request) {
  try {
    const { message, sessionId = uuidv4() } = await req.json();

    const graph = createPlanningGraph();

    // 流式事件以提供实时更新
    const encoder = new TextEncoder();
    const stream = new TransformStream();
    const writer = stream.writable.getWriter();

    (async () => {
      try {
        const events = graph.stream(
          {
            messages: [new HumanMessage(message)],
          },
          {
            configurable: { thread_id: sessionId },
            streamMode: 'values',
          }
        );

        for await (const event of events) {
          // 向前端发送计划更新
          if (event.plan) {
            await writer.write(
              encoder.encode(`data: ${JSON.stringify({
                type: 'plan_update',
                plan: event.plan,
                currentStep: event.currentStep,
              })}\n\n`)
            );
          }

          // 发送最终输出
          if (event.finalOutput) {
            await writer.write(
              encoder.encode(`data: ${JSON.stringify({
                type: 'complete',
                output: event.finalOutput,
              })}\n\n`)
            );
          }
        }
      } catch (error) {
        console.error('Streaming error:', error);
        await writer.write(
          encoder.encode(`data: ${JSON.stringify({
            type: 'error',
            error: 'Processing failed',
          })}\n\n`)
        );
      } finally {
        await writer.close();
      }
    })();

    return new Response(stream.readable, {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
      },
    });
  } catch (error) {
    console.error('Planning API error:', error);
    return NextResponse.json(
      { error: 'Failed to process request' },
      { status: 500 }
    );
  }
}

通过流式API端点公开规划智能体,在智能体执行计划过程中提供实时更新。

6. 使用React Query的前端组件

// components/PlanningInterface.tsx
'use client';

import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
import { map } from 'es-toolkit';

interface PlanStep {
  step: number;
  action: string;
  reasoning: string;
  status: 'pending' | 'in_progress' | 'completed' | 'failed';
  result?: string;
}

export default function PlanningInterface() {
  const [input, setInput] = useState('');
  const [plan, setPlan] = useState<PlanStep[]>([]);
  const [output, setOutput] = useState('');

  const executePlan = useMutation({
    mutationFn: async (message: string) => {
      const response = await fetch('/api/planning', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ message }),
      });

      if (!response.ok) throw new Error('Failed to execute plan');
      if (!response.body) throw new Error('No response body');

      const reader = response.body.getReader();
      const decoder = new TextDecoder();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            try {
              const data = JSON.parse(line.slice(6));

              if (data.type === 'plan_update') {
                setPlan(data.plan);
              } else if (data.type === 'complete') {
                setOutput(data.output);
              }
            } catch (e) {
              console.error('Parse error:', e);
            }
          }
        }
      }
    },
  });

  const getStatusBadge = (status: string) => {
    const badges = {
      pending: 'badge-ghost',
      in_progress: 'badge-info',
      completed: 'badge-success',
      failed: 'badge-error',
    };
    return badges[status as keyof typeof badges] || 'badge-ghost';
  };

  return (
    <div className="card bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">规划智能体</h2>

        <form onSubmit={(e) => {
          e.preventDefault();
          executePlan.mutate(input);
        }}>
          <div className="form-control">
            <textarea
              className="textarea textarea-bordered h-24"
              placeholder="描述一个复杂任务..."
              value={input}
              onChange={(e) => setInput(e.target.value)}
              disabled={executePlan.isPending}
            />
          </div>

          <div className="card-actions justify-end mt-4">
            <button
              type="submit"
              className="btn btn-primary"
              disabled={!input || executePlan.isPending}
            >
              {executePlan.isPending ? (
                <>
                  <span className="loading loading-spinner"></span>
                  规划与执行中...
                </>
              ) : '创建计划并执行'}
            </button>
          </div>
        </form>

        {plan.length > 0 && (
          <div className="mt-6">
            <h3 className="font-bold mb-2">执行计划:</h3>
            <ul className="steps steps-vertical">
              {map(plan, (step) => (
                <li
                  key={step.step}
                  className={`step ${step.status === 'completed' ? 'step-primary' : ''}`}
                >
                  <div className="text-left">
                    <div className="flex items-center gap-2">
                      <span className="font-semibold">{step.action}</span>
                      <span className={`badge badge-sm ${getStatusBadge(step.status)}`}>
                        {step.status}
                      </span>
                    </div>
                    <p className="text-sm opacity-70">{step.reasoning}</p>
                    {step.result && (
                      <div className="mt-2 p-2 bg-base-200 rounded text-sm">
                        {step.result}
                      </div>
                    )}
                  </div>
                </li>
              ))}
            </ul>
          </div>
        )}

        {output && (
          <div className="alert alert-success mt-4">
            <span>{output}</span>
          </div>
        )}

        {executePlan.isError && (
          <div className="alert alert-error mt-4">
            <span>执行计划失败。请重试。</span>
          </div>
        )}
      </div>
    </div>
  );
}

React组件,使用DaisyUI组件实时可视化计划创建和执行,显示步骤进度和结果。

高级示例:带思维树的ReAct智能体

1. 已安装的额外依赖项

// 项目设置中已提供所有依赖项:
// @langchain/google-genai - Google AI模型
// @langchain/langgraph - 有状态工作流
// es-toolkit, es-toolkit/compat - 函数式工具
// zod - 模式验证
// @tanstack/react-query - 数据获取

无需额外安装 - 项目设置中已预安装所有必需的包。

// lib/advanced-planning/types.ts
import { z } from 'zod';
import { Annotation } from '@langchain/langgraph';
import { BaseMessage } from '@langchain/core/messages';

// 树结构的思维节点
export const ThoughtNodeSchema = z.object({
  id: z.string(),
  content: z.string(),
  score: z.number(),
  depth: z.number(),
  parentId: z.string().nullable(),
  children: z.array(z.string()).default([]),
  isTerminal: z.boolean().default(false),
  metadata: z.record(z.any()).optional(),
});

export type ThoughtNode = z.infer<typeof ThoughtNodeSchema>;

// 增强的树探索状态
export const TreeAgentState = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: (current, update) => current.concat(update),
    default: () => [],
  }),
  thoughtTree: Annotation<Map<string, ThoughtNode>>({
    reducer: (current, update) => new Map([...current, ...update]),
    default: () => new Map(),
  }),
  currentNodeId: Annotation<string | null>({
    default: () => null
  }),
  bestPath: Annotation<string[]>({
    reducer: (current, update) => update,
    default: () => [],
  }),
  explorationBudget: Annotation<number>({
    default: () => 10
  }),
  iterationCount: Annotation<number>({
    reducer: (current, update) => current + update,
    default: () => 0,
  }),
});

export type TreeState = typeof TreeAgentState.State;

定义基于树的探索状态,支持分支思维过程,具有评分和深度跟踪,用于复杂推理。

2. 实现带树探索的ReAct模式

// lib/advanced-planning/nodes/react-tree.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { SystemMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
import { maxBy, filter, map, sortBy, take } from 'es-toolkit';
import { v4 as uuidv4 } from 'uuid';
import { TreeState, ThoughtNode } from '../types';

const reasoningModel = new ChatGoogleGenerativeAI({
  model: 'gemini-2.5-pro',
  temperature: 0.7,
  maxOutputTokens: 8192,
}).withStructuredOutput({
  name: 'thought_expansion',
  description: 'Generate multiple reasoning paths',
  parameters: {
    type: 'object',
    properties: {
      thoughts: {
        type: 'array',
        items: {
          type: 'object',
          properties: {
            reasoning: { type: 'string' },
            action: { type: 'string' },
            confidence: { type: 'number' },
          },
        },
      },
    },
  },
});

export async function reactTreeNode(state: TreeState): Promise<Partial<TreeState>> {
  const { thoughtTree, currentNodeId, explorationBudget, iterationCount } = state;

  // 检查探索预算
  if (iterationCount >= explorationBudget) {
    return selectBestPath(state);
  }

  // 获取当前上下文
  const currentNode = currentNodeId ? thoughtTree.get(currentNodeId) : null;
  const context = buildContext(state, currentNode);

  // 生成多个思维分支
  const thoughtPrompt = `Given the current context, generate 3 different reasoning paths.
  Each path should:
  1. Provide unique reasoning approach
  2. Suggest a specific action
  3. Estimate confidence (0-1) in the approach

  Context: ${context}

  Current thought: ${currentNode?.content || 'Initial state'}`;

  const response = await reasoningModel.invoke([
    new SystemMessage('You are a reasoning agent exploring multiple solution paths.'),
    new HumanMessage(thoughtPrompt),
  ]);

  // 创建新的思维节点
  const newNodes = new Map<string, ThoughtNode>();
  const parentDepth = currentNode?.depth || 0;

  for (const thought of response.thoughts) {
    const nodeId = uuidv4();
    const node: ThoughtNode = {
      id: nodeId,
      content: `${thought.reasoning} → ${thought.action}`,
      score: thought.confidence * (1 / (parentDepth + 1)), // 分数随深度衰减
      depth: parentDepth + 1,
      parentId: currentNodeId,
      children: [],
      isTerminal: false,
      metadata: { action: thought.action },
    };

    newNodes.set(nodeId, node);

    // 更新父节点的子节点
    if (currentNode) {
      currentNode.children.push(nodeId);
      thoughtTree.set(currentNodeId!, {
        ...currentNode,
        children: currentNode.children,
      });
    }
  }

  // 使用es-toolkit选择下一个要探索的节点(最高分数)
  const nextNode = maxBy(
    Array.from(newNodes.values()),
    (node: ThoughtNode) => node.score
  );

  return {
    thoughtTree: new Map([...thoughtTree, ...newNodes]),
    currentNodeId: nextNode?.id || null,
    iterationCount: 1,
  };
}

function buildContext(state: TreeState, currentNode: ThoughtNode | null): string {
  if (!currentNode) {
    return state.messages[state.messages.length - 1]?.content as string || '';
  }

  // 使用es-toolkit构建从根到当前的路径
  const path: ThoughtNode[] = [];
  let node: ThoughtNode | null = currentNode;

  while (node) {
    path.unshift(node);
    node = node.parentId ? state.thoughtTree.get(node.parentId) || null : null;
  }

  return map(path, (n: ThoughtNode) => n.content).join(' → ');
}

function selectBestPath(state: TreeState): Partial<TreeState> {
  const { thoughtTree } = state;

  // 使用es-toolkit查找终端节点或最深节点
  const allNodes = Array.from(thoughtTree.values());
  const terminalNodes = filter(allNodes, (n: ThoughtNode) =>
    n.isTerminal || n.children.length === 0
  );

  // 选择最佳终端节点
  const bestNode = maxBy(terminalNodes, (n: ThoughtNode) => n.score);

  if (!bestNode) {
    return { bestPath: [] };
  }

  // 重构路径
  const path: string[] = [];
  let current: ThoughtNode | null = bestNode;

  while (current) {
    path.unshift(current.id);
    current = current.parentId ? thoughtTree.get(current.parentId) || null : null;
  }

  return { bestPath: path };
}

实现基于树的探索,结合ReAct模式与思维树,使用Gemini 2.5 Pro进行系统化的多重推理路径探索。

3. 带反思的动作执行

// lib/advanced-planning/nodes/reflective-executor.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { DynamicStructuredTool } from '@langchain/core/tools';
import { z } from 'zod';
import { find } from 'es-toolkit';
import { TreeState, ThoughtNode } from '../types';

// 定义带模式的工具
const searchTool = new DynamicStructuredTool({
  name: 'search',
  description: 'Search for information',
  schema: z.object({
    query: z.string(),
  }),
  func: async ({ query }) => {
    // 模拟搜索
    return `Search results for "${query}": [relevant information]`;
  },
});

const calculateTool = new DynamicStructuredTool({
  name: 'calculate',
  description: 'Perform calculations',
  schema: z.object({
    expression: z.string(),
  }),
  func: async ({ expression }) => {
    // 使用math.js或类似工具进行实际计算
    try {
      // 演示用途的安全评估
      const result = Function('"use strict"; return (' + expression + ')')();
      return `Calculation result: ${result}`;
    } catch (e) {
      return `Calculation error: ${e}`;
    }
  },
});

const tools = [searchTool, calculateTool];

export async function reflectiveExecutor(state: TreeState): Promise<Partial<TreeState>> {
  const { thoughtTree, currentNodeId } = state;

  if (!currentNodeId) return {};

  const currentNode = thoughtTree.get(currentNodeId);
  if (!currentNode?.metadata?.action) return {};

  const action = currentNode.metadata.action as string;

  // 执行动作
  let result: string;
  try {
    // 解析动作并执行适当的工具
    const toolMatch = action.match(/(\w+)\((.*)\)/);
    if (toolMatch) {
      const [, toolName, args] = toolMatch;
      const tool = find(tools, (t) => t.name === toolName);

      if (tool) {
        // 安全解析参数
        const parsedArgs = args ? { [toolName === 'search' ? 'query' : 'expression']: args.replace(/['"]/g, '') } : {};
        result = await tool.func(parsedArgs);
      } else {
        result = `Unknown tool: ${toolName}`;
      }
    } else {
      result = 'No valid action to execute';
    }
  } catch (error) {
    result = `Execution error: ${error}`;
  }

  // 使用Gemini Flash反思结果
  const reflectionModel = new ChatGoogleGenerativeAI({
    model: 'gemini-2.5-flash',
    temperature: 0,
    maxOutputTokens: 1024,
  });

  const reflection = await reflectionModel.invoke([
    new SystemMessage('Evaluate if this result successfully addresses the goal.'),
    new HumanMessage(`Action: ${action}\nResult: ${result}\nDoes this achieve our objective?`),
  ]);

  // 用反思更新节点
  const updatedNode: ThoughtNode = {
    ...currentNode,
    isTerminal: reflection.content?.includes('success') || reflection.content?.includes('achieved') || false,
    metadata: {
      ...currentNode.metadata,
      result,
      reflection: reflection.content,
    },
  };

  thoughtTree.set(currentNodeId, updatedNode);

  return {
    thoughtTree,
  };
}

从思维节点执行动作并使用Gemini 2.5 Flash进行反思来评估成功,使智能体能够从执行结果中学习。

4. 带并行探索的编排图

// lib/advanced-planning/graph.ts
import { StateGraph, END } from '@langchain/langgraph';
import { filter } from 'es-toolkit';
import { TreeState, ThoughtNode } from './types';
import { reactTreeNode } from './nodes/react-tree';
import { reflectiveExecutor } from './nodes/reflective-executor';

export function createAdvancedPlanningGraph() {
  const workflow = new StateGraph<TreeState>({
    stateSchema: TreeState,
  });

  // 添加节点
  workflow.addNode('think', reactTreeNode);
  workflow.addNode('act', reflectiveExecutor);
  workflow.addNode('select_best', selectBestPathNode);

  // 条件路由
  workflow.addConditionalEdges(
    'think',
    (state: TreeState) => {
      // 检查是否应该继续探索或执行
      const { iterationCount, explorationBudget } = state;

      if (iterationCount >= explorationBudget) {
        return 'select';
      }

      // 检查当前节点是否需要动作
      const currentNode = state.currentNodeId
        ? state.thoughtTree.get(state.currentNodeId)
        : null;

      return currentNode?.metadata?.action ? 'execute' : 'explore';
    },
    {
      explore: 'think',
      execute: 'act',
      select: 'select_best',
    }
  );

  workflow.addConditionalEdges(
    'act',
    (state: TreeState) => {
      const currentNode = state.currentNodeId
        ? state.thoughtTree.get(state.currentNodeId)
        : null;

      return currentNode?.isTerminal ? 'end' : 'continue';
    },
    {
      end: END,
      continue: 'think',
    }
  );

  workflow.addEdge('select_best', END);
  workflow.setEntryPoint('think');

  return workflow.compile();
}

async function selectBestPathNode(state: TreeState): Promise<Partial<TreeState>> {
  const { thoughtTree, bestPath } = state;

  if (bestPath.length === 0) {
    return { finalOutput: 'No solution found within exploration budget' };
  }

  // 使用es-toolkit从最佳路径编译结果
  const pathNodes = filter(
    bestPath.map(id => thoughtTree.get(id)),
    (node): node is ThoughtNode => node !== undefined
  );

  const solution = pathNodes
    .map(node => ({
      step: node.content,
      result: node.metadata?.result || 'N/A',
    }));

  return {
    finalOutput: JSON.stringify(solution, null, 2),
  };
}

编排复杂推理,在思考、行动和路径选择阶段之间进行条件分支,以实现最优问题解决。

5. 带缓存的性能优化

// lib/advanced-planning/cache.ts
import { hash } from 'es-toolkit/compat';
import { isEqual } from 'es-toolkit';

interface CacheEntry {
  result: any;
  timestamp: number;
  ttl: number;
}

// 对于Vercel无服务器,使用内存缓存或外部服务如Redis
// 这是演示用的简化内存版本
class InMemoryCache {
  private cache = new Map<string, CacheEntry>();

  get(key: string): any | null {
    const entry = this.cache.get(key);
    if (!entry) return null;

    if (Date.now() - entry.timestamp > entry.ttl * 1000) {
      this.cache.delete(key);
      return null;
    }

    return entry.result;
  }

  set(key: string, value: any, ttl: number): void {
    this.cache.set(key, {
      result: value,
      timestamp: Date.now(),
      ttl,
    });
  }

  delete(key: string): void {
    this.cache.delete(key);
  }
}

export class PlanningCache {
  private readonly prefix = 'planning:';
  private readonly defaultTTL = 3600; // 1小时
  private store = new InMemoryCache();

  async get(key: string): Promise<any | null> {
    try {
      const cacheKey = this.generateKey(key);
      return this.store.get(cacheKey);
    } catch (error) {
      console.error('Cache get error:', error);
      return null;
    }
  }

  async set(key: string, value: any, ttl?: number): Promise<void> {
    try {
      const cacheKey = this.generateKey(key);
      this.store.set(cacheKey, value, ttl || this.defaultTTL);
    } catch (error) {
      console.error('Cache set error:', error);
    }
  }

  private generateKey(input: string): string {
    // 使用es-toolkit/compat的hash进行一致的键生成
    const hashed = hash(input);
    return `${this.prefix}${hashed}`;
  }

  // 为相似查询进行语义相似性缓存
  async findSimilar(
    query: string,
    threshold: number = 0.85
  ): Promise<any | null> {
    try {
      // 在生产中,使用嵌入的向量相似性搜索
      // 这是简化的令牌重叠版本
      const queryTokens = new Set(query.toLowerCase().split(' '));

      // 检查缓存查询的相似性
      for (const [key, entry] of (this.store as any).cache) {
        if (!key.startsWith(this.prefix)) continue;

        // 如果存储,从元数据中提取原始查询
        const similarity = this.calculateSimilarity(query, key);
        if (similarity > threshold) {
          return entry.result;
        }
      }

      return null;
    } catch (error) {
      console.error('Similarity search error:', error);
      return null;
    }
  }

  private calculateSimilarity(query1: string, query2: string): number {
    // 演示用的简单Jaccard相似性
    const set1 = new Set(query1.toLowerCase().split(' '));
    const set2 = new Set(query2.toLowerCase().split(' '));

    const intersection = new Set([...set1].filter(x => set2.has(x)));
    const union = new Set([...set1, ...set2]);

    return intersection.size / union.size;
  }
}

// 为无服务器导出单例实例
export const planningCache = new PlanningCache();

使用es-toolkit工具实现智能缓存,具有TTL和语义相似性匹配,以减少冗余LLM调用并改善响应时间。

6. 树探索的前端可视化

// components/TreeVisualization.tsx
'use client';

import { useEffect, useRef } from 'react';
import { ThoughtNode } from '@/lib/advanced-planning/types';

interface TreeVisualizationProps {
  thoughtTree: Map<string, ThoughtNode>;
  currentNodeId: string | null;
  bestPath: string[];
}

export default function TreeVisualization({
  thoughtTree,
  currentNodeId,
  bestPath,
}: TreeVisualizationProps) {
  const canvasRef = useRef<HTMLCanvasElement>(null);

  useEffect(() => {
    if (!canvasRef.current) return;

    const canvas = canvasRef.current;
    const ctx = canvas.getContext('2d');
    if (!ctx) return;

    // 清除画布
    ctx.clearRect(0, 0, canvas.width, canvas.height);

    // 计算节点位置
    const positions = calculateTreeLayout(thoughtTree);

    // 绘制边
    thoughtTree.forEach((node) => {
      const nodePos = positions.get(node.id);
      if (!nodePos) return;

      node.children.forEach((childId) => {
        const childPos = positions.get(childId);
        if (!childPos) return;

        ctx.beginPath();
        ctx.moveTo(nodePos.x, nodePos.y);
        ctx.lineTo(childPos.x, childPos.y);

        // 突出显示最佳路径
        if (bestPath.includes(node.id) && bestPath.includes(childId)) {
          ctx.strokeStyle = '#10b981';
          ctx.lineWidth = 3;
        } else {
          ctx.strokeStyle = '#6b7280';
          ctx.lineWidth = 1;
        }

        ctx.stroke();
      });
    });

    // 绘制节点
    thoughtTree.forEach((node) => {
      const pos = positions.get(node.id);
      if (!pos) return;

      ctx.beginPath();
      ctx.arc(pos.x, pos.y, 20, 0, 2 * Math.PI);

      // 根据状态着色
      if (node.id === currentNodeId) {
        ctx.fillStyle = '#3b82f6'; // 当前节点 - 蓝色
      } else if (bestPath.includes(node.id)) {
        ctx.fillStyle = '#10b981'; // 最佳路径 - 绿色
      } else if (node.isTerminal) {
        ctx.fillStyle = '#f59e0b'; // 终端 - 橙色
      } else {
        ctx.fillStyle = '#e5e7eb'; // 默认 - 灰色
      }

      ctx.fill();
      ctx.strokeStyle = '#1f2937';
      ctx.lineWidth = 2;
      ctx.stroke();

      // 绘制分数
      ctx.fillStyle = '#1f2937';
      ctx.font = '12px sans-serif';
      ctx.textAlign = 'center';
      ctx.textBaseline = 'middle';
      ctx.fillText(node.score.toFixed(2), pos.x, pos.y);
    });
  }, [thoughtTree, currentNodeId, bestPath]);

  return (
    <div className="card bg-base-100 shadow-xl">
      <div className="card-body">
        <h3 className="card-title">思维树探索</h3>
        <canvas
          ref={canvasRef}
          width={800}
          height={400}
          className="border border-base-300 rounded-lg"
        />
        <div className="flex gap-4 mt-4 text-sm">
          <div className="flex items-center gap-2">
            <div className="w-4 h-4 bg-blue-500 rounded-full"></div>
            <span>当前</span>
          </div>
          <div className="flex items-center gap-2">
            <div className="w-4 h-4 bg-green-500 rounded-full"></div>
            <span>最佳路径</span>
          </div>
          <div className="flex items-center gap-2">
            <div className="w-4 h-4 bg-orange-500 rounded-full"></div>
            <span>终端</span>
          </div>
        </div>
      </div>
    </div>
  );
}

function calculateTreeLayout(
  thoughtTree: Map<string, ThoughtNode>
): Map<string, { x: number; y: number }> {
  const positions = new Map<string, { x: number; y: number }>();
  const levelCounts = new Map<number, number>();

  // 计算每层的节点数
  thoughtTree.forEach((node) => {
    const count = levelCounts.get(node.depth) || 0;
    levelCounts.set(node.depth, count + 1);
  });

  // 计算位置
  const levelIndices = new Map<number, number>();
  thoughtTree.forEach((node) => {
    const levelIndex = levelIndices.get(node.depth) || 0;
    const levelCount = levelCounts.get(node.depth) || 1;

    const x = (800 / (levelCount + 1)) * (levelIndex + 1);
    const y = 50 + node.depth * 100;

    positions.set(node.id, { x, y });
    levelIndices.set(node.depth, levelIndex + 1);
  });

  return positions;
}

实时可视化思维树探索,显示当前节点、最佳路径和终端节点,使用交互式画布渲染。

结论

规划模式将AI智能体从简单的响应者转变为能够处理复杂多步任务的复杂问题解决者。通过实现用于结构化工作流的计划执行模式,以及结合ReAct与思维树进行探索性推理,您可以构建匹配或超越人类规划能力的智能体。

这些实现的关键见解:

  • 关注点分离:规划和执行之间的分离使得能够为每个阶段使用最优模型(Gemini 2.5 Pro用于规划,Gemini 2.5 Flash用于执行)
  • 树探索:带评分机制的树探索比线性方法找到更好的解决方案
  • 缓存和优化:使复杂的规划在生产中经济可行
  • 实时可视化:帮助用户理解和信任智能体决策制定
  • es-toolkit使用:确保代码整洁、函数式,并针对无服务器环境优化

这些模式在Vercel的无服务器平台上运行,使用TypeScript、LangGraph和Google Gemini模型,为构建生产就绪的规划智能体提供基础,这些智能体能够分解复杂性、探索替代方案,并在777秒执行窗口内大规模交付可靠结果。