草案 智能体设计模式 - 目标设定
构建能够自主分解复杂目标、跟踪进度并适应策略的智能代理 - 所有这些都能在 Vercel 的无服务器基础设施上高效运行。
心智模型:项目经理代理
将目标设定代理想象成一个专业的项目经理,它接收高层目标并将其分解为可执行的任务。就像项目经理将"推出新产品"分解为冲刺、任务和子任务,同时监控进度并调整计划一样,我们的代理将通过 TypeScript 和 LangGraph 的基于图的编排,将目标分解为层次结构、跟踪执行状态并根据进度调整策略。
基础示例:简单目标分解代理
1. 定义目标代理状态
// app/agents/goal/state.ts
import { Annotation } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
export const GoalAgentState = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (current, update) => current.concat(update),
default: () => []
}),
currentGoal: Annotation<string>({
reducer: (current, update) => update || current,
default: () => ""
}),
subGoals: Annotation<string[]>({
reducer: (current, update) => {
if (Array.isArray(update)) return update;
return current.concat(update);
},
default: () => []
}),
completedSubGoals: Annotation<string[]>({
reducer: (current, update) => current.concat(update),
default: () => []
}),
goalStatus: Annotation<"planning" | "executing" | "completed" | "failed">({
reducer: (current, update) => update || current,
default: () => "planning"
})
});
此状态模式使用 LangGraph 的类型安全 Annotation 系统跟踪我们的目标层次结构和执行状态。
2. 创建目标分解节点
// app/agents/goal/nodes/decompose.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { HumanMessage, AIMessage } from "@langchain/core/messages";
import { z } from "zod";
import { zodToJsonSchema } from "zod-to-json-schema";
import { map, chunk } from "es-toolkit";
const SubGoalsSchema = z.object({
subGoals: z.array(z.object({
task: z.string().describe("具体的可执行任务"),
priority: z.number().min(1).max(5).describe("优先级 1-5"),
dependencies: z.array(z.string()).describe("必须首先完成的任务")
}))
});
export async function decomposeGoal(state: typeof GoalAgentState.State) {
const model = new ChatGoogleGenerativeAI({
modelName: "gemini-pro",
temperature: 0.3
});
const prompt = `将此目标分解为具体的、可执行的子任务:
目标:${state.currentGoal}
考虑任务之间的依赖关系并分配优先级。`;
const response = await model.invoke([
new HumanMessage(prompt)
], {
functions: [{
name: "decompose_goal",
description: "将目标分解为子任务",
parameters: zodToJsonSchema(SubGoalsSchema)
}],
function_call: { name: "decompose_goal" }
});
const functionCall = response.additional_kwargs.function_call;
const decomposition = JSON.parse(functionCall?.arguments || "{}");
// 按优先级和依赖关系排序
const sortedGoals = decomposition.subGoals
.sort((a: any, b: any) => {
if (a.dependencies.length === 0 && b.dependencies.length > 0) return -1;
if (b.dependencies.length === 0 && a.dependencies.length > 0) return 1;
return b.priority - a.priority;
});
const subGoalTasks = map(sortedGoals, (goal: any) => goal.task);
return {
messages: [response],
subGoals: subGoalTasks,
goalStatus: "executing" as const
};
}
分解节点使用结构化输出将目标分解为有优先级的、依赖感知的子任务。
3. 构建目标代理图
// app/agents/goal/graph.ts
import { StateGraph, END } from "@langchain/langgraph";
import { GoalAgentState } from "./state";
import { decomposeGoal } from "./nodes/decompose";
import { executeSubGoal } from "./nodes/execute";
import { evaluateProgress } from "./nodes/evaluate";
export function createGoalAgent() {
const workflow = new StateGraph(GoalAgentState)
.addNode("decompose", decomposeGoal)
.addNode("execute", executeSubGoal)
.addNode("evaluate", evaluateProgress)
.addEdge("decompose", "execute")
.addEdge("execute", "evaluate")
.addConditionalEdges(
"evaluate",
async (state) => {
if (state.goalStatus === "completed") return "end";
if (state.subGoals.length > state.completedSubGoals.length) return "execute";
return "end";
},
{
execute: "execute",
end: END
}
);
return workflow.compile();
}
图编排目标分解、执行和进度评估的循环,直到完成。
4. API 路由处理器
// app/api/agent/goal/route.ts
import { NextRequest, NextResponse } from "next/server";
import { createGoalAgent } from "@/agents/goal/graph";
import { HumanMessage } from "@langchain/core/messages";
export async function POST(req: NextRequest) {
const { goal } = await req.json();
const agent = createGoalAgent();
const result = await agent.invoke({
messages: [new HumanMessage(`实现目标:${goal}`)],
currentGoal: goal
});
return NextResponse.json({
goal,
subGoals: result.subGoals,
completedSubGoals: result.completedSubGoals,
status: result.goalStatus
});
}
简单的 API 端点,在我们的无服务器环境中触发目标处理。
5. 前端集成
// app/components/GoalTracker.tsx
"use client";
import { useMutation, useQuery } from "@tanstack/react-query";
import { useState } from "react";
import { groupBy } from "es-toolkit";
export function GoalTracker() {
const [goal, setGoal] = useState("");
const goalMutation = useMutation({
mutationFn: async (goal: string) => {
const res = await fetch("/api/agent/goal", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ goal })
});
return res.json();
}
});
return (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">目标设定代理</h2>
<input
type="text"
placeholder="输入您的目标..."
className="input input-bordered w-full"
value={goal}
onChange={(e) => setGoal(e.target.value)}
/>
<button
className="btn btn-primary"
onClick={() => goalMutation.mutate(goal)}
disabled={goalMutation.isPending}
>
{goalMutation.isPending ? "处理中..." : "设定目标"}
</button>
{goalMutation.data && (
<div className="mt-4">
<div className="badge badge-outline">
{goalMutation.data.status}
</div>
<div className="divider">子目标</div>
<ul className="space-y-2">
{goalMutation.data.subGoals.map((subGoal: string, idx: number) => (
<li key={idx} className="flex items-center gap-2">
<input
type="checkbox"
className="checkbox checkbox-sm"
checked={goalMutation.data.completedSubGoals.includes(subGoal)}
readOnly
/>
<span className="text-sm">{subGoal}</span>
</li>
))}
</ul>
</div>
)}
</div>
</div>
);
}
带有 TanStack Query 的 React 组件,用于目标提交和进度跟踪。
高级示例:自主目标监控系统
1. 带监控指标的增强状态
// app/agents/advanced-goal/state.ts
import { Annotation } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
interface GoalMetrics {
startTime: number;
estimatedCompletion: number;
actualProgress: number;
blockers: string[];
adjustments: number;
}
export const AdvancedGoalState = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (current, update) => current.concat(update),
default: () => []
}),
currentGoal: Annotation<string>(),
subGoals: Annotation<Array<{
id: string;
task: string;
status: "pending" | "in_progress" | "completed" | "failed";
progress: number;
dependencies: string[];
estimatedHours: number;
actualHours: number;
}>>({
reducer: (current, update) => {
if (Array.isArray(update)) return update;
// 为现有子目标合并更新
return current.map(goal => {
const updateItem = update.find((u: any) => u.id === goal.id);
return updateItem ? { ...goal, ...updateItem } : goal;
});
},
default: () => []
}),
metrics: Annotation<GoalMetrics>({
reducer: (current, update) => ({ ...current, ...update }),
default: () => ({
startTime: Date.now(),
estimatedCompletion: 0,
actualProgress: 0,
blockers: [],
adjustments: 0
})
}),
strategy: Annotation<"waterfall" | "parallel" | "adaptive">({
default: () => "adaptive"
}),
humanApproval: Annotation<boolean>({
default: () => false
})
});
高级状态包括详细指标、策略选择和人在回路标志。
2. 带依赖关系的智能目标规划
// app/agents/advanced-goal/nodes/planner.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { z } from "zod";
import { partition, sortBy, filter } from "es-toolkit";
import { v4 as uuidv4 } from "uuid";
const PlanSchema = z.object({
plan: z.object({
approach: z.string(),
reasoning: z.string(),
subGoals: z.array(z.object({
task: z.string(),
estimatedHours: z.number(),
dependencies: z.array(z.string()),
skills: z.array(z.string()),
priority: z.enum(["critical", "high", "medium", "low"])
})),
risks: z.array(z.string()),
successCriteria: z.array(z.string())
})
});
export async function createExecutionPlan(state: typeof AdvancedGoalState.State) {
const model = new ChatGoogleGenerativeAI({
modelName: "gemini-pro",
temperature: 0.2
});
const planningPrompt = `为此目标创建详细的执行计划:
目标:${state.currentGoal}
分析目标并提供:
1. 总体方法和推理
2. 详细的子任务和时间估算
3. 任务依赖关系(哪些任务必须在其他任务之前完成)
4. 每个任务所需的技能/资源
5. 潜在风险和阻碍因素
6. 明确的成功标准
考虑并行执行机会和关键路径优化。`;
const response = await model.withStructuredOutput(PlanSchema).invoke([
{ role: "system", content: "您是专门从事目标分解和执行策略的专家项目规划师。" },
{ role: "user", content: planningPrompt }
]);
// 将计划转换为可执行的子目标
const subGoals = response.plan.subGoals.map((goal, idx) => ({
id: uuidv4(),
task: goal.task,
status: "pending" as const,
progress: 0,
dependencies: goal.dependencies,
estimatedHours: goal.estimatedHours,
actualHours: 0,
priority: goal.priority
}));
// 计算预计完成时间
const criticalPath = calculateCriticalPath(subGoals);
const estimatedCompletion = Date.now() + (criticalPath * 60 * 60 * 1000);
// 确定最优策略
const [independent, dependent] = partition(
subGoals,
g => g.dependencies.length === 0
);
const strategy = independent.length > dependent.length ? "parallel" : "waterfall";
return {
subGoals,
metrics: {
estimatedCompletion,
actualProgress: 0,
blockers: response.plan.risks,
adjustments: 0
},
strategy,
messages: [{
role: "assistant",
content: `计划已创建:${response.plan.approach}\n策略:${strategy}`
}]
};
}
function calculateCriticalPath(goals: any[]): number {
// 简单的关键路径计算
const goalMap = new Map(goals.map(g => [g.task, g]));
const calculatePath = (goalId: string, visited = new Set()): number => {
if (visited.has(goalId)) return 0;
visited.add(goalId);
const goal = goalMap.get(goalId);
if (!goal) return 0;
const depTimes = goal.dependencies.map((d: string) =>
calculatePath(d, visited)
);
return goal.estimatedHours + Math.max(0, ...depTimes);
};
return Math.max(...goals.map(g => calculatePath(g.task)));
}
高级规划器创建带有依赖分析和关键路径优化的详细执行计划。
3. 带监控的并行执行
// app/agents/advanced-goal/nodes/executor.ts
import { Send } from "@langchain/langgraph";
import { filter, map, take } from "es-toolkit";
export async function parallelExecutor(state: typeof AdvancedGoalState.State) {
// 找到可执行的子目标(没有待定依赖)
const executableGoals = filter(state.subGoals, goal => {
if (goal.status !== "pending") return false;
const deps = goal.dependencies;
if (deps.length === 0) return true;
return deps.every(depTask => {
const depGoal = state.subGoals.find(g => g.task === depTask);
return depGoal?.status === "completed";
});
});
if (executableGoals.length === 0) {
return { goalStatus: "blocked" };
}
// 并行执行最多 3 个目标
const toExecute = take(executableGoals, 3);
// 使用 Send API 进行并行执行
return toExecute.map(goal =>
new Send("execute_single", {
goalId: goal.id,
task: goal.task,
parentGoal: state.currentGoal
})
);
}
export async function executeSingleGoal({ goalId, task, parentGoal }: any) {
// 模拟带进度更新的目标执行
const steps = 5;
const updates = [];
for (let i = 1; i <= steps; i++) {
await new Promise(resolve => setTimeout(resolve, 100)); // 模拟工作
updates.push({
id: goalId,
progress: (i / steps) * 100,
actualHours: (i / steps) * 2
});
}
return {
subGoals: [{
id: goalId,
status: "completed",
progress: 100
}],
messages: [{
role: "assistant",
content: `已完成:${task}`
}]
};
}
并行执行器识别独立任务并使用 LangGraph 的 Send API 同时执行它们。
4. 实时监控和适应
// app/agents/advanced-goal/nodes/monitor.ts
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { mean, sum, filter } from "es-toolkit";
export async function monitorAndAdapt(state: typeof AdvancedGoalState.State) {
const model = new ChatGoogleGenerativeAI({ modelName: "gemini-pro" });
// 计算进度指标
const completedCount = filter(state.subGoals, g => g.status === "completed").length;
const totalCount = state.subGoals.length;
const overallProgress = (completedCount / totalCount) * 100;
// 检查延误
const actualHours = sum(state.subGoals.map(g => g.actualHours));
const estimatedHours = sum(state.subGoals.map(g => g.estimatedHours));
const scheduleVariance = ((actualHours - estimatedHours) / estimatedHours) * 100;
// 识别阻碍因素
const stuckGoals = filter(state.subGoals,
g => g.status === "in_progress" && g.progress < 30
);
// 自适应策略调整
if (scheduleVariance > 20 || stuckGoals.length > 0) {
const adaptationPrompt = `当前目标进度:
- 总体:${overallProgress.toFixed(1)}%
- 进度偏差:${scheduleVariance.toFixed(1)}%
- 阻塞任务:${stuckGoals.map(g => g.task).join(", ")}
建议策略调整以重回正轨。`;
const response = await model.invoke([
{ role: "system", content: "您是项目恢复专家。" },
{ role: "user", content: adaptationPrompt }
]);
// 如果需要,触发重新规划
if (scheduleVariance > 50) {
return {
strategy: "adaptive" as const,
metrics: {
adjustments: state.metrics.adjustments + 1,
actualProgress: overallProgress
},
messages: [response],
humanApproval: true // 请求人工批准重大变更
};
}
}
return {
metrics: {
actualProgress: overallProgress
},
goalStatus: overallProgress === 100 ? "completed" : "executing"
};
}
监控器持续跟踪进度,识别瓶颈,并在需要时触发自适应重新规划。
5. 长期运行目标的持久化层
// app/agents/advanced-goal/persistence.ts
import { kv } from "@vercel/kv";
import { SqliteSaver } from "@langchain/langgraph-checkpoint-sqlite";
export class GoalPersistence {
private checkpointer: SqliteSaver;
constructor() {
this.checkpointer = new SqliteSaver("/tmp/goals.db");
}
async saveGoalState(goalId: string, state: any) {
// 保存到 Vercel KV 以便快速访问
await kv.hset(`goal:${goalId}`, {
currentGoal: state.currentGoal,
status: state.goalStatus,
progress: state.metrics.actualProgress,
subGoals: JSON.stringify(state.subGoals),
lastUpdated: Date.now()
});
// 设置 30 天过期
await kv.expire(`goal:${goalId}`, 30 * 24 * 60 * 60);
}
async getGoalState(goalId: string) {
const state = await kv.hgetall(`goal:${goalId}`);
if (state && state.subGoals) {
state.subGoals = JSON.parse(state.subGoals as string);
}
return state;
}
async getActiveGoals(userId: string) {
const pattern = `goal:${userId}:*`;
const keys = await kv.keys(pattern);
const goals = await Promise.all(
keys.map(key => kv.hgetall(key))
);
return goals.filter(g => g && g.status !== "completed");
}
}
持久化层使用 Vercel KV 进行状态存储,实现跨无服务器函数调用的目标跟踪。
6. 带人在回路的完整图
// app/agents/advanced-goal/graph.ts
import { StateGraph, END } from "@langchain/langgraph";
import { AdvancedGoalState } from "./state";
import { createExecutionPlan } from "./nodes/planner";
import { parallelExecutor, executeSingleGoal } from "./nodes/executor";
import { monitorAndAdapt } from "./nodes/monitor";
export function createAdvancedGoalAgent() {
const workflow = new StateGraph(AdvancedGoalState)
.addNode("plan", createExecutionPlan)
.addNode("parallel_execute", parallelExecutor)
.addNode("execute_single", executeSingleGoal)
.addNode("monitor", monitorAndAdapt)
.addNode("human_review", async (state) => {
// 为人工输入中断
return interrupt({
message: "需要计划调整",
currentPlan: state.subGoals,
suggestedChanges: state.messages[state.messages.length - 1]
});
});
// 定义边
workflow
.addEdge("plan", "parallel_execute")
.addEdge("execute_single", "monitor")
.addEdge("parallel_execute", "monitor")
.addConditionalEdges(
"monitor",
async (state) => {
if (state.humanApproval) return "human_review";
if (state.goalStatus === "completed") return "end";
if (state.goalStatus === "blocked") return "plan";
return "parallel_execute";
},
{
human_review: "human_review",
plan: "plan",
parallel_execute: "parallel_execute",
end: END
}
)
.addEdge("human_review", "plan");
return workflow.compile({
checkpointer: new SqliteSaver("/tmp/advanced_goals.db")
});
}
完整图编排规划、并行执行、监控和在需要时的人工干预。
7. 实时流式 API
// app/api/agent/advanced-goal/stream/route.ts
import { NextRequest } from "next/server";
import { createAdvancedGoalAgent } from "@/agents/advanced-goal/graph";
import { GoalPersistence } from "@/agents/advanced-goal/persistence";
export async function POST(req: NextRequest) {
const { goal, userId } = await req.json();
const goalId = `${userId}:${Date.now()}`;
const persistence = new GoalPersistence();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
const agent = createAdvancedGoalAgent();
// 流式传输事件
for await (const event of agent.stream({
currentGoal: goal,
messages: []
}, {
streamMode: "updates",
configurable: { thread_id: goalId }
})) {
// 发送进度更新
const update = {
type: "progress",
goalId,
data: event
};
controller.enqueue(
encoder.encode(`data: ${JSON.stringify(update)}\n\n`)
);
// 定期保存状态
if (event.monitor) {
await persistence.saveGoalState(goalId, event.monitor);
}
}
controller.close();
}
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
}
流式 API 在处理目标时提供实时更新,非常适合进度跟踪 UI。
8. 高级前端仪表板
// app/components/AdvancedGoalDashboard.tsx
"use client";
import { useState, useEffect } from "react";
import { useQuery, useMutation } from "@tanstack/react-query";
import { groupBy, sortBy, filter } from "es-toolkit";
interface Goal {
id: string;
task: string;
status: string;
progress: number;
dependencies: string[];
}
export function AdvancedGoalDashboard() {
const [goalInput, setGoalInput] = useState("");
const [streamData, setStreamData] = useState<any[]>([]);
const startGoal = useMutation({
mutationFn: async (goal: string) => {
const response = await fetch("/api/agent/advanced-goal/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ goal, userId: "user123" })
});
if (!response.body) throw new Error("No stream");
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = JSON.parse(line.slice(6));
setStreamData(prev => [...prev, data]);
}
}
}
}
});
// 按状态分组目标
const latestUpdate = streamData[streamData.length - 1];
const subGoals = latestUpdate?.data?.monitor?.subGoals || [];
const goalsByStatus = groupBy(subGoals, (g: Goal) => g.status);
return (
<div className="container mx-auto p-4">
<div className="card bg-base-100 shadow-xl mb-6">
<div className="card-body">
<h2 className="card-title text-2xl">高级目标设定与监控</h2>
<div className="form-control">
<div className="input-group">
<input
type="text"
placeholder="输入您的目标..."
className="input input-bordered flex-1"
value={goalInput}
onChange={(e) => setGoalInput(e.target.value)}
/>
<button
className="btn btn-primary"
onClick={() => startGoal.mutate(goalInput)}
disabled={startGoal.isPending}
>
{startGoal.isPending ? (
<span className="loading loading-spinner"></span>
) : (
"开始目标"
)}
</button>
</div>
</div>
</div>
</div>
{subGoals.length > 0 && (
<div className="grid grid-cols-1 md:grid-cols-3 gap-4">
{/* 待定任务 */}
<div className="card bg-base-200">
<div className="card-body">
<h3 className="card-title text-lg">
待定 ({goalsByStatus.pending?.length || 0})
</h3>
<div className="space-y-2">
{(goalsByStatus.pending || []).map((goal: Goal) => (
<div key={goal.id} className="bg-base-100 p-3 rounded">
<div className="text-sm font-medium">{goal.task}</div>
{goal.dependencies.length > 0 && (
<div className="text-xs text-base-content/60 mt-1">
等待:{goal.dependencies.join(", ")}
</div>
)}
</div>
))}
</div>
</div>
</div>
{/* 进行中 */}
<div className="card bg-warning/20">
<div className="card-body">
<h3 className="card-title text-lg">
进行中 ({goalsByStatus.in_progress?.length || 0})
</h3>
<div className="space-y-2">
{(goalsByStatus.in_progress || []).map((goal: Goal) => (
<div key={goal.id} className="bg-base-100 p-3 rounded">
<div className="text-sm font-medium">{goal.task}</div>
<progress
className="progress progress-warning w-full mt-2"
value={goal.progress}
max="100"
/>
<div className="text-xs text-right mt-1">
{goal.progress}%
</div>
</div>
))}
</div>
</div>
</div>
{/* 已完成 */}
<div className="card bg-success/20">
<div className="card-body">
<h3 className="card-title text-lg">
已完成 ({goalsByStatus.completed?.length || 0})
</h3>
<div className="space-y-2">
{(goalsByStatus.completed || []).map((goal: Goal) => (
<div key={goal.id} className="bg-base-100 p-3 rounded">
<div className="text-sm font-medium line-through">
{goal.task}
</div>
<div className="badge badge-success badge-sm mt-1">
完成
</div>
</div>
))}
</div>
</div>
</div>
</div>
)}
{/* 实时更新动态 */}
{streamData.length > 0 && (
<div className="card bg-base-100 shadow-xl mt-6">
<div className="card-body">
<h3 className="card-title">实时更新</h3>
<div className="h-48 overflow-y-auto space-y-1">
{streamData.slice(-10).reverse().map((update, idx) => (
<div key={idx} className="text-sm">
<span className="badge badge-ghost badge-sm mr-2">
{new Date().toLocaleTimeString()}
</span>
<span className="text-base-content/80">
{update.type}: {JSON.stringify(update.data).slice(0, 100)}...
</span>
</div>
))}
</div>
</div>
</div>
)}
</div>
);
}
功能齐全的仪表板,带有实时流式更新、看板式任务可视化和进度跟踪。
结论
智能体设计模式中的目标设定将静态目标转换为动态的自我管理系统。通过利用 LangGraph 的基于图的编排、TypeScript 的类型安全以及 Vercel 的无服务器基础设施,我们可以构建生产就绪的代理,这些代理能够自主分解目标、跟踪进度并调整策略。该模式的力量在于将结构化规划与自适应执行相结合 - 就像最优秀的项目经理一样,我们的代理可以彻底规划、高效执行,并在必要时转向,同时保持完整的可观测性和人工监督能力。