草案 智能体设计模式 - 优先级排序
学习如何使用TypeScript中的LangChain和LangGraph构建智能任务优先级系统,针对Vercel的无服务器平台进行优化。本指南演示了动态优先级队列、加权评分算法以及生产环境AI代理的自适应任务管理的实现。
心智模型:急诊室分诊系统
将代理优先级排序想象成急诊室的分诊系统。就像医务人员根据严重程度、紧急程度和可用资源评估进来的患者一样,您的AI代理必须使用多种标准来评估任务。在医院里,胸痛患者会比轻微割伤患者获得优先治疗 - 同样,您的代理会优先处理关键客户问题而非例行查询。分诊护士(您的优先级算法)会在新患者到达时持续重新评估(动态重新排序),专门团队处理不同的严重程度级别(基于优先级的路由)。这个心智模型帮助您理解为什么简单的FIFO队列在复杂场景中会失败,以及智能优先级排序如何显著提高系统效率。
基础示例:使用LangGraph的任务优先级队列
1. 定义优先级状态和类型
// types/priority.ts
import { z } from 'zod';
import { BaseMessage } from '@langchain/core/messages';
// 使用枚举实现类型安全的优先级级别
export enum Priority {
CRITICAL = 0, // 最高优先级
HIGH = 1,
MEDIUM = 2,
LOW = 3,
BACKGROUND = 4 // 最低优先级
}
// 使用Zod验证的任务模式
export const TaskSchema = z.object({
id: z.string(),
description: z.string(),
priority: z.nativeEnum(Priority),
deadline: z.date().optional(),
dependencies: z.array(z.string()).default([]),
assignee: z.string().optional(),
createdAt: z.date(),
metadata: z.record(z.any()).default({})
});
export type Task = z.infer<typeof TaskSchema>;
// LangGraph工作流的状态
export interface PriorityState {
tasks: Task[];
currentTask: Task | null;
completedTasks: string[];
messages: BaseMessage[];
metadata: Record<string, any>;
}
定义强类型的优先级级别和任务结构,通过Zod验证确保整个优先级系统的类型安全。
2. 使用堆实现优先级队列
// lib/priority-queue.ts
import { sortBy, remove } from 'es-toolkit';
import { Task, Priority } from '@/types/priority';
export class PriorityQueue {
private tasks: Task[] = [];
private maxSize: number = 10000;
// 添加任务同时保持堆属性
enqueue(task: Task): void {
if (this.tasks.length >= this.maxSize) {
// 删除低优先级的旧任务
const oldTasks = this.tasks.filter(t =>
t.priority === Priority.BACKGROUND &&
Date.now() - t.createdAt.getTime() > 3600000 // 1小时
);
if (oldTasks.length > 0) {
remove(this.tasks, t => t.id === oldTasks[0].id);
}
}
this.tasks.push(task);
this.heapifyUp();
}
// 提取最高优先级的任务
dequeue(): Task | null {
if (this.tasks.length === 0) return null;
const task = this.tasks[0];
const last = this.tasks.pop();
if (this.tasks.length > 0 && last) {
this.tasks[0] = last;
this.heapifyDown();
}
return task;
}
private heapifyUp(): void {
let index = this.tasks.length - 1;
while (index > 0) {
const parentIndex = Math.floor((index - 1) / 2);
if (this.compare(this.tasks[index], this.tasks[parentIndex]) >= 0) {
break;
}
[this.tasks[index], this.tasks[parentIndex]] =
[this.tasks[parentIndex], this.tasks[index]];
index = parentIndex;
}
}
private heapifyDown(): void {
let index = 0;
while (2 * index + 1 < this.tasks.length) {
const leftChild = 2 * index + 1;
const rightChild = 2 * index + 2;
let smallest = index;
if (this.compare(this.tasks[leftChild], this.tasks[smallest]) < 0) {
smallest = leftChild;
}
if (rightChild < this.tasks.length &&
this.compare(this.tasks[rightChild], this.tasks[smallest]) < 0) {
smallest = rightChild;
}
if (smallest === index) break;
[this.tasks[index], this.tasks[smallest]] =
[this.tasks[smallest], this.tasks[index]];
index = smallest;
}
}
// 比较任务以进行优先级排序
private compare(a: Task, b: Task): number {
// 首先按优先级级别比较
if (a.priority !== b.priority) {
return a.priority - b.priority;
}
// 然后按截止日期比较(如果存在)
if (a.deadline && b.deadline) {
return a.deadline.getTime() - b.deadline.getTime();
}
if (a.deadline) return -1;
if (b.deadline) return 1;
// 最后按创建时间比较(相同优先级采用FIFO)
return a.createdAt.getTime() - b.createdAt.getTime();
}
// 获取按优先级排序的所有任务
getTasks(): Task[] {
return sortBy(this.tasks, [
t => t.priority,
t => t.deadline?.getTime() || Infinity,
t => t.createdAt.getTime()
]);
}
}
实现具有O(log n)操作的高效最小堆优先级队列,自动清理旧的低优先级任务,并进行多标准比较。
3. 创建优先级分配代理
// lib/agents/priority-agent.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { Priority, Task } from '@/types/priority';
import { chunk } from 'es-toolkit';
export class PriorityAssignmentAgent {
private model: ChatGoogleGenerativeAI;
constructor() {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
maxOutputTokens: 1024,
});
}
async assignPriority(taskDescription: string): Promise<Priority> {
const systemPrompt = `您是任务优先级专家。分析任务并分配优先级级别。
优先级级别:
- CRITICAL (0): 系统故障、安全漏洞、数据丢失风险、生产环境宕机
- HIGH (1): 客户面临的问题、收入影响、SLA违规
- MEDIUM (2): 重要功能、非关键错误、标准请求
- LOW (3): 锦上添花的功能、小改进、文档
- BACKGROUND (4): 清理任务、优化、研究
仅用优先级级别名称回复。`;
const response = await this.model.invoke([
new SystemMessage(systemPrompt),
new HumanMessage(`任务: ${taskDescription}`)
]);
const content = response.content.toString().trim().toUpperCase();
// 将响应映射到Priority枚举
switch (content) {
case 'CRITICAL': return Priority.CRITICAL;
case 'HIGH': return Priority.HIGH;
case 'MEDIUM': return Priority.MEDIUM;
case 'LOW': return Priority.LOW;
case 'BACKGROUND': return Priority.BACKGROUND;
default: return Priority.MEDIUM; // 安全的默认值
}
}
async batchAssignPriorities(tasks: string[]): Promise<Priority[]> {
// 分块处理以避免速率限制
const chunks = chunk(tasks, 5);
const results: Priority[] = [];
for (const batch of chunks) {
const promises = batch.map(task => this.assignPriority(task));
const priorities = await Promise.all(promises);
results.push(...priorities);
}
return results;
}
}
使用Gemini Flash智能分析任务描述,并根据业务影响和紧急程度分配适当的优先级级别。
4. 构建LangGraph工作流
// lib/workflows/priority-workflow.ts
import { StateGraph, END } from '@langchain/langgraph';
import { PriorityState, Task, Priority } from '@/types/priority';
import { PriorityQueue } from '@/lib/priority-queue';
import { PriorityAssignmentAgent } from '@/lib/agents/priority-agent';
import { v4 as uuidv4 } from 'uuid';
export function createPriorityWorkflow() {
const queue = new PriorityQueue();
const agent = new PriorityAssignmentAgent();
const workflow = new StateGraph<PriorityState>({
channels: {
tasks: {
value: (x: Task[], y: Task[]) => [...x, ...y],
default: () => [],
},
currentTask: {
value: (x: Task | null, y: Task | null) => y || x,
default: () => null,
},
completedTasks: {
value: (x: string[], y: string[]) => [...x, ...y],
default: () => [],
},
messages: {
value: (x, y) => [...x, ...y],
default: () => [],
},
metadata: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
},
});
// 节点:接收和优先级排序新任务
workflow.addNode('receive', async (state) => {
const { messages } = state;
const lastMessage = messages[messages.length - 1];
if (!lastMessage?.content) {
return { metadata: { error: '未提供任务描述' } };
}
const taskDescription = lastMessage.content.toString();
const priority = await agent.assignPriority(taskDescription);
const newTask: Task = {
id: uuidv4(),
description: taskDescription,
priority,
createdAt: new Date(),
dependencies: [],
metadata: {}
};
queue.enqueue(newTask);
return {
tasks: [newTask],
metadata: {
lastAdded: newTask.id,
queueSize: queue.getTasks().length
}
};
});
// 节点:从队列中选择下一个任务
workflow.addNode('select', async (state) => {
const nextTask = queue.dequeue();
if (!nextTask) {
return {
currentTask: null,
metadata: { status: 'queue_empty' }
};
}
return {
currentTask: nextTask,
metadata: {
status: 'task_selected',
selectedId: nextTask.id,
selectedPriority: Priority[nextTask.priority]
}
};
});
// 节点:处理当前任务
workflow.addNode('process', async (state) => {
const { currentTask } = state;
if (!currentTask) {
return { metadata: { error: '没有要处理的任务' } };
}
// 模拟任务处理
await new Promise(resolve => setTimeout(resolve, 100));
return {
completedTasks: [currentTask.id],
currentTask: null,
metadata: {
status: 'task_completed',
completedId: currentTask.id
}
};
});
// 定义边
workflow.addEdge('receive', 'select');
workflow.addEdge('select', 'process');
workflow.addEdge('process', END);
workflow.setEntryPoint('receive');
return workflow.compile();
}
创建一个有状态的工作流,用于接收任务、分配优先级、管理队列选择以及按优先级顺序处理任务。
5. 优先级队列的API路由
// app/api/priority/route.ts
import { createPriorityWorkflow } from '@/lib/workflows/priority-workflow';
import { HumanMessage } from '@langchain/core/messages';
import { NextResponse } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 60;
export async function POST(req: Request) {
try {
const { task } = await req.json();
if (!task) {
return NextResponse.json(
{ error: '需要任务描述' },
{ status: 400 }
);
}
const workflow = createPriorityWorkflow();
const result = await workflow.invoke({
tasks: [],
currentTask: null,
completedTasks: [],
messages: [new HumanMessage(task)],
metadata: {}
});
return NextResponse.json({
taskAdded: result.tasks[0],
queueSize: result.metadata.queueSize,
status: result.metadata.status
});
} catch (error) {
console.error('优先级工作流错误:', error);
return NextResponse.json(
{ error: '无法处理任务' },
{ status: 500 }
);
}
}
将优先级工作流公开为REST API端点,接受任务描述并返回带有队列状态的优先级分配。
高级示例:动态多代理优先级系统
1. 使用多个标准的加权评分
// lib/scoring/weighted-scorer.ts
import { Task } from '@/types/priority';
import { sum, map, zip } from 'es-toolkit';
interface ScoringCriteria {
urgency: number; // 0-10 比例
impact: number; // 0-10 比例
effort: number; // 0-10 比例(反向)
confidence: number; // 0-10 比例
customerValue: number; // 0-10 比例
}
interface ScoringWeights {
urgency: number;
impact: number;
effort: number;
confidence: number;
customerValue: number;
}
export class WeightedPriorityScorer {
private weights: ScoringWeights;
constructor(weights?: Partial<ScoringWeights>) {
// 默认权重总和为1.0
this.weights = {
urgency: weights?.urgency ?? 0.3,
impact: weights?.impact ?? 0.25,
effort: weights?.effort ?? 0.15,
confidence: weights?.confidence ?? 0.15,
customerValue: weights?.customerValue ?? 0.15,
};
}
// 计算加权优先级分数
calculateScore(criteria: ScoringCriteria): number {
const scores = [
criteria.urgency * this.weights.urgency,
criteria.impact * this.weights.impact,
(10 - criteria.effort) * this.weights.effort, // 反转努力值
criteria.confidence * this.weights.confidence,
criteria.customerValue * this.weights.customerValue,
];
return sum(scores);
}
// 使用LLM从任务提取评分标准
async extractCriteria(
task: Task,
model: ChatGoogleGenerativeAI
): Promise<ScoringCriteria> {
const prompt = `分析此任务并对每个标准进行0-10评分:
任务: ${task.description}
标准:
- 紧急性: 时间敏感程度?(10 = 立即,0 = 可以等待)
- 影响: 业务/用户影响?(10 = 关键,0 = 最小)
- 努力: 实施复杂性?(10 = 非常复杂,0 = 简单)
- 信心: 需求清晰度?(10 = 非常清晰,0 = 模糊)
- 客户价值: 直接客户利益?(10 = 高价值,0 = 仅内部)
以JSON格式回复: {"urgency": X, "impact": Y, ...}`;
const response = await model.invoke([new HumanMessage(prompt)]);
const content = response.content.toString();
try {
const parsed = JSON.parse(content);
return {
urgency: parsed.urgency || 5,
impact: parsed.impact || 5,
effort: parsed.effort || 5,
confidence: parsed.confidence || 5,
customerValue: parsed.customerValue || 5,
};
} catch {
// 解析错误时默认为中等分数
return {
urgency: 5,
impact: 5,
effort: 5,
confidence: 5,
customerValue: 5,
};
}
}
// 根据上下文动态调整权重
adjustWeights(context: {
isEmergency?: boolean;
isCustomerFacing?: boolean;
resourcesLimited?: boolean;
}): void {
if (context.isEmergency) {
this.weights.urgency = 0.5;
this.weights.impact = 0.3;
this.normalizeWeights();
}
if (context.isCustomerFacing) {
this.weights.customerValue = 0.3;
this.normalizeWeights();
}
if (context.resourcesLimited) {
this.weights.effort = 0.25;
this.normalizeWeights();
}
}
private normalizeWeights(): void {
const total = sum(Object.values(this.weights));
for (const key in this.weights) {
this.weights[key as keyof ScoringWeights] /= total;
}
}
}
实现RICE风格的加权评分,具有基于业务上下文的动态权重调整和基于LLM的标准提取。
2. 多代理优先级路由器
// lib/agents/priority-router.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { Priority, Task, PriorityState } from '@/types/priority';
import { groupBy, maxBy } from 'es-toolkit';
interface RouterState extends PriorityState {
routedTasks: Record<string, Task[]>;
agentAssignments: Record<string, string>;
}
export class PriorityRouter {
private criticalAgent: ChatGoogleGenerativeAI;
private standardAgent: ChatGoogleGenerativeAI;
private backgroundAgent: ChatGoogleGenerativeAI;
constructor() {
// 不同优先级级别使用不同的模型
this.criticalAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0,
maxOutputTokens: 4096,
});
this.standardAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
maxOutputTokens: 2048,
});
this.backgroundAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.5,
maxOutputTokens: 1024,
});
}
createRouterWorkflow() {
const workflow = new StateGraph<RouterState>({
channels: {
tasks: {
value: (x, y) => [...x, ...y],
default: () => [],
},
routedTasks: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
agentAssignments: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
currentTask: {
value: (x, y) => y || x,
default: () => null,
},
completedTasks: {
value: (x, y) => [...x, ...y],
default: () => [],
},
messages: {
value: (x, y) => [...x, ...y],
default: () => [],
},
metadata: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
},
});
// 将任务路由到适当的队列
workflow.addNode('route', async (state) => {
const { tasks } = state;
// 按优先级分组任务
const grouped = groupBy(tasks, t => Priority[t.priority]);
const routed: Record<string, Task[]> = {
critical: grouped['CRITICAL'] || [],
high: grouped['HIGH'] || [],
standard: [...(grouped['MEDIUM'] || []), ...(grouped['LOW'] || [])],
background: grouped['BACKGROUND'] || [],
};
// 根据优先级分配代理
const assignments: Record<string, string> = {};
for (const task of tasks) {
if (task.priority === Priority.CRITICAL) {
assignments[task.id] = 'critical-agent';
} else if (task.priority === Priority.HIGH) {
assignments[task.id] = 'high-priority-agent';
} else if (task.priority <= Priority.LOW) {
assignments[task.id] = 'standard-agent';
} else {
assignments[task.id] = 'background-agent';
}
}
return {
routedTasks: routed,
agentAssignments: assignments,
metadata: {
criticalCount: routed.critical.length,
highCount: routed.high.length,
standardCount: routed.standard.length,
backgroundCount: routed.background.length,
}
};
});
// 使用专用资源处理关键任务
workflow.addNode('process-critical', async (state) => {
const { routedTasks } = state;
const criticalTasks = routedTasks.critical || [];
if (criticalTasks.length === 0) {
return { metadata: { criticalStatus: 'none' } };
}
// 使用高性能模型处理
const results = await Promise.all(
criticalTasks.map(async task => {
const response = await this.criticalAgent.invoke([
new HumanMessage(`关键任务: ${task.description}`)
]);
return {
taskId: task.id,
result: response.content,
processingTime: Date.now()
};
})
);
return {
completedTasks: criticalTasks.map(t => t.id),
metadata: {
criticalProcessed: results.length,
criticalResults: results
}
};
});
// 批量处理标准任务
workflow.addNode('process-standard', async (state) => {
const { routedTasks } = state;
const standardTasks = routedTasks.standard || [];
if (standardTasks.length === 0) {
return { metadata: { standardStatus: 'none' } };
}
// 批处理以提高效率
const batchSize = 5;
const completed: string[] = [];
for (let i = 0; i < standardTasks.length; i += batchSize) {
const batch = standardTasks.slice(i, i + batchSize);
const batchPromises = batch.map(task =>
this.standardAgent.invoke([
new HumanMessage(`任务: ${task.description}`)
])
);
await Promise.all(batchPromises);
completed.push(...batch.map(t => t.id));
}
return {
completedTasks: completed,
metadata: { standardProcessed: completed.length }
};
});
// 安排后台任务
workflow.addNode('schedule-background', async (state) => {
const { routedTasks } = state;
const backgroundTasks = routedTasks.background || [];
if (backgroundTasks.length === 0) {
return { metadata: { backgroundStatus: 'none' } };
}
// 排队进行非高峰处理
const scheduled = backgroundTasks.map(task => ({
taskId: task.id,
scheduledFor: new Date(Date.now() + 3600000), // 1小时后
agent: 'background-agent'
}));
return {
metadata: {
backgroundScheduled: scheduled.length,
scheduledTasks: scheduled
}
};
});
// 基于任务存在的条件路由
workflow.addConditionalEdges('route', (state) => {
const { routedTasks } = state;
const edges = [];
if (routedTasks.critical?.length > 0) edges.push('process-critical');
if (routedTasks.standard?.length > 0) edges.push('process-standard');
if (routedTasks.background?.length > 0) edges.push('schedule-background');
return edges.length > 0 ? edges : [END];
});
workflow.addEdge('process-critical', END);
workflow.addEdge('process-standard', END);
workflow.addEdge('schedule-background', END);
workflow.setEntryPoint('route');
return workflow.compile();
}
}
实现复杂的多代理路由,为每个优先级级别配备不同的模型和处理策略。
3. 动态优先级调整系统
// lib/priority/dynamic-adjuster.ts
import { Task, Priority } from '@/types/priority';
import { mean, standardDeviation } from 'es-toolkit';
import { kv } from '@vercel/kv';
interface TaskMetrics {
taskId: string;
attempts: number;
failureRate: number;
averageProcessingTime: number;
lastAttempt: Date;
ageInHours: number;
}
export class DynamicPriorityAdjuster {
private readonly AGING_FACTOR = 0.1; // 每小时优先级提升
private readonly FAILURE_THRESHOLD = 0.3; // 30%失败率触发提升
private readonly STARVATION_HOURS = 4; // 防饥饿机制启动前的小时数
async adjustPriority(task: Task, metrics: TaskMetrics): Promise<Priority> {
let priorityScore = task.priority;
// 基于年龄的调整(防止饥饿)
if (metrics.ageInHours > this.STARVATION_HOURS) {
const ageBoost = Math.floor(
(metrics.ageInHours - this.STARVATION_HOURS) * this.AGING_FACTOR
);
priorityScore = Math.max(0, priorityScore - ageBoost);
}
// 失败率调整
if (metrics.failureRate > this.FAILURE_THRESHOLD) {
priorityScore = Math.max(0, priorityScore - 1);
}
// 截止日期接近度调整
if (task.deadline) {
const hoursUntilDeadline =
(task.deadline.getTime() - Date.now()) / 3600000;
if (hoursUntilDeadline < 24) {
priorityScore = Priority.HIGH;
}
if (hoursUntilDeadline < 4) {
priorityScore = Priority.CRITICAL;
}
}
// 存储调整历史
await this.storeAdjustment(task.id, task.priority, priorityScore);
return priorityScore as Priority;
}
async calculateSystemLoad(): Promise<{
load: 'low' | 'normal' | 'high' | 'critical';
metrics: Record<string, number>;
}> {
// 从KV存储获取最近的任务指标
const recentTasks = await kv.lrange('task-queue', 0, 100);
if (!recentTasks || recentTasks.length === 0) {
return { load: 'low', metrics: {} };
}
const priorities = recentTasks.map((t: any) => t.priority);
const criticalCount = priorities.filter(p => p === Priority.CRITICAL).length;
const highCount = priorities.filter(p => p === Priority.HIGH).length;
const avgPriority = mean(priorities);
const stdDev = standardDeviation(priorities);
// 确定系统负载
let load: 'low' | 'normal' | 'high' | 'critical';
if (criticalCount > 10 || avgPriority < 1) {
load = 'critical';
} else if (highCount > 20 || avgPriority < 2) {
load = 'high';
} else if (avgPriority < 3) {
load = 'normal';
} else {
load = 'low';
}
return {
load,
metrics: {
averagePriority: avgPriority,
standardDeviation: stdDev,
criticalTasks: criticalCount,
highTasks: highCount,
totalTasks: recentTasks.length,
}
};
}
async optimizeThroughput(tasks: Task[]): Promise<Task[]> {
const systemLoad = await this.calculateSystemLoad();
// 根据负载应用不同的策略
switch (systemLoad.load) {
case 'critical':
// 仅专注于关键任务
return tasks
.filter(t => t.priority <= Priority.HIGH)
.sort((a, b) => a.priority - b.priority);
case 'high':
// 平衡关键和高优先级
return tasks
.filter(t => t.priority <= Priority.MEDIUM)
.sort((a, b) => {
// 交替高优先级与一些中等优先级
if (a.priority === b.priority) return 0;
if (a.priority === Priority.CRITICAL) return -1;
if (b.priority === Priority.CRITICAL) return 1;
// 以3:1比例混合高和中优先级
return Math.random() > 0.25 ?
a.priority - b.priority :
b.priority - a.priority;
});
case 'normal':
// 标准优先级排序
return tasks.sort((a, b) => a.priority - b.priority);
case 'low':
// 处理所有任务,提升后台工作
return tasks.sort((a, b) => {
// 给后台任务一个机会
if (a.priority === Priority.BACKGROUND &&
b.priority === Priority.LOW) {
return Math.random() > 0.7 ? -1 : 1;
}
return a.priority - b.priority;
});
}
}
private async storeAdjustment(
taskId: string,
originalPriority: Priority,
adjustedPriority: Priority
): Promise<void> {
const adjustment = {
taskId,
originalPriority: Priority[originalPriority],
adjustedPriority: Priority[adjustedPriority],
timestamp: new Date().toISOString(),
reason: this.determineAdjustmentReason(originalPriority, adjustedPriority)
};
await kv.lpush('priority-adjustments', adjustment);
await kv.ltrim('priority-adjustments', 0, 999); // 保留最近1000条
}
private determineAdjustmentReason(
original: Priority,
adjusted: Priority
): string {
if (adjusted < original) {
return 'priority_increased';
} else if (adjusted > original) {
return 'priority_decreased';
}
return 'no_change';
}
}
实现动态优先级调整,包括防老化、基于失败的提升和负载感知的吞吐量优化。
4. 流式优先级更新
// app/api/priority/stream/route.ts
import { DynamicPriorityAdjuster } from '@/lib/priority/dynamic-adjuster';
import { PriorityRouter } from '@/lib/agents/priority-router';
import { Task } from '@/types/priority';
export const runtime = 'nodejs';
export const maxDuration = 300;
export async function GET(req: Request) {
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const adjuster = new DynamicPriorityAdjuster();
const router = new PriorityRouter();
const workflow = router.createRouterWorkflow();
// 每秒流式传输优先级更新
const interval = setInterval(async () => {
try {
const systemLoad = await adjuster.calculateSystemLoad();
const update = {
timestamp: new Date().toISOString(),
load: systemLoad.load,
metrics: systemLoad.metrics,
recommendation: getRecommendation(systemLoad.load)
};
await writer.write(
encoder.encode(`data: ${JSON.stringify(update)}\n\n`)
);
} catch (error) {
console.error('流错误:', error);
}
}, 1000);
// 客户端断开连接时清理
req.signal.addEventListener('abort', () => {
clearInterval(interval);
writer.close();
});
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
function getRecommendation(load: string): string {
switch (load) {
case 'critical':
return '系统过载。考虑扩展代理或推迟低优先级任务。';
case 'high':
return '检测到高负载。密切监控并准备扩展。';
case 'normal':
return '系统正常运行。';
case 'low':
return '利用率低。进行维护或后台任务的好时机。';
default:
return '未知的系统状态。';
}
}
通过SSE流提供实时优先级系统监控,使仪表板能够显示实时系统负载和建议。
5. 前端优先级仪表板
// components/PriorityDashboard.tsx
'use client';
import { useQuery, useMutation } from '@tanstack/react-query';
import { useState, useEffect } from 'react';
import { Task, Priority } from '@/types/priority';
import { groupBy, sortBy } from 'es-toolkit';
export default function PriorityDashboard() {
const [tasks, setTasks] = useState<Task[]>([]);
const [systemLoad, setSystemLoad] = useState<any>(null);
const [newTask, setNewTask] = useState('');
// 提交新任务
const submitTask = useMutation({
mutationFn: async (description: string) => {
const res = await fetch('/api/priority', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ task: description }),
});
return res.json();
},
onSuccess: (data) => {
if (data.taskAdded) {
setTasks(prev => [...prev, data.taskAdded]);
}
setNewTask('');
},
});
// 流式系统更新
useEffect(() => {
const eventSource = new EventSource('/api/priority/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setSystemLoad(data);
};
return () => eventSource.close();
}, []);
// 按优先级分组任务
const groupedTasks = groupBy(tasks, t => Priority[t.priority]);
return (
<div className="p-6 max-w-6xl mx-auto">
<div className="mb-8">
<h1 className="text-3xl font-bold mb-4">优先级管理系统</h1>
{/* 系统负载指示器 */}
{systemLoad && (
<div className={`alert ${
systemLoad.load === 'critical' ? 'alert-error' :
systemLoad.load === 'high' ? 'alert-warning' :
'alert-info'
} mb-4`}>
<div>
<h3 className="font-bold">系统负载: {systemLoad.load.toUpperCase()}</h3>
<p className="text-sm">{systemLoad.recommendation}</p>
<div className="stats stats-horizontal mt-2 bg-transparent">
<div className="stat px-2">
<div className="stat-title text-xs">关键</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.criticalTasks || 0}
</div>
</div>
<div className="stat px-2">
<div className="stat-title text-xs">高</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.highTasks || 0}
</div>
</div>
<div className="stat px-2">
<div className="stat-title text-xs">总计</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.totalTasks || 0}
</div>
</div>
</div>
</div>
</div>
)}
{/* 任务输入 */}
<div className="form-control mb-6">
<div className="input-group">
<input
type="text"
placeholder="描述一个新任务..."
className="input input-bordered flex-1"
value={newTask}
onChange={(e) => setNewTask(e.target.value)}
onKeyPress={(e) => {
if (e.key === 'Enter' && newTask.trim()) {
submitTask.mutate(newTask);
}
}}
/>
<button
className="btn btn-primary"
onClick={() => newTask.trim() && submitTask.mutate(newTask)}
disabled={submitTask.isPending || !newTask.trim()}
>
{submitTask.isPending ? (
<span className="loading loading-spinner"></span>
) : (
'添加任务'
)}
</button>
</div>
</div>
{/* 优先级泳道 */}
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-4">
{Object.entries(Priority)
.filter(([key]) => isNaN(Number(key)))
.map(([priorityName, priorityValue]) => (
<div key={priorityName} className="card bg-base-100 shadow-xl">
<div className="card-body p-4">
<h2 className={`card-title text-sm ${
priorityName === 'CRITICAL' ? 'text-error' :
priorityName === 'HIGH' ? 'text-warning' :
priorityName === 'MEDIUM' ? 'text-info' :
'text-base-content'
}`}>
{priorityName}
<div className="badge badge-sm">
{groupedTasks[priorityName]?.length || 0}
</div>
</h2>
<div className="space-y-2 mt-2">
{(groupedTasks[priorityName] || [])
.slice(0, 5)
.map(task => (
<div
key={task.id}
className="text-xs p-2 bg-base-200 rounded"
>
<div className="font-medium truncate">
{task.description}
</div>
{task.deadline && (
<div className="text-xs opacity-70 mt-1">
到期: {new Date(task.deadline).toLocaleDateString()}
</div>
)}
</div>
))}
{groupedTasks[priorityName]?.length > 5 && (
<div className="text-xs opacity-50 text-center">
+{groupedTasks[priorityName].length - 5} 更多
</div>
)}
</div>
</div>
</div>
))}
</div>
</div>
</div>
);
}
创建一个交互式仪表板,显示实时优先级泳道、系统负载指示器和带有视觉反馈的任务提交。
结论
智能体设计模式中的优先级排序通过引入适应现实世界约束的智能任务管理,改变了AI系统处理复杂工作负载的方式。通过结合基于堆的优先级队列、LLM驱动的优先级分配和动态调整算法,我们创建了反映人类决策同时以机器规模运行的系统。基本实现提供了具有多标准排序的高效O(log n)操作,而高级示例展示了包括加权评分、多代理路由和实时负载监控在内的生产就绪功能。
这些模式在Vercel等无服务器环境中表现出色,其中无状态执行和快速扩展至关重要。通过利用LangGraph的有状态工作流和es-toolkit的优化工具,系统即使在高负载下也能保持性能,同时提供根据不断变化的业务条件调整优先级的灵活性。关键见解是,有效的优先级排序不仅仅是对任务排序 - 而是创建自适应系统,平衡紧急性、影响和资源可用性,以最大化整体系统效率。