DRAFT Agentic Design Patterns - Prioritization
Learn how to build intelligent task prioritization systems using LangChain and LangGraph in TypeScript, optimized for Vercel's serverless platform. This guide demonstrates implementing dynamic priority queues, weighted scoring algorithms, and adaptive task management for production AI agents.
Mental Model: The Emergency Room Triage System
Think of agent prioritization like an emergency room triage system. Just as medical staff assess incoming patients based on severity, urgency, and available resources, your AI agents must evaluate tasks using multiple criteria. In a hospital, a patient with chest pain gets immediate attention while someone with a minor cut waits - similarly, your agent prioritizes critical customer issues over routine inquiries. The triage nurse (your prioritization algorithm) continuously re-evaluates as new patients arrive (dynamic re-prioritization), and specialized teams handle different severity levels (priority-based routing). This mental model helps you understand why simple FIFO queues fail in complex scenarios and how intelligent prioritization dramatically improves system effectiveness.
Basic Example: Task Priority Queue with LangGraph
1. Define Priority State and Types
// types/priority.ts
import { z } from 'zod';
import { BaseMessage } from '@langchain/core/messages';
// Priority levels using enum for type safety
export enum Priority {
CRITICAL = 0, // Highest priority
HIGH = 1,
MEDIUM = 2,
LOW = 3,
BACKGROUND = 4 // Lowest priority
}
// Task schema with Zod validation
export const TaskSchema = z.object({
id: z.string(),
description: z.string(),
priority: z.nativeEnum(Priority),
deadline: z.date().optional(),
dependencies: z.array(z.string()).default([]),
assignee: z.string().optional(),
createdAt: z.date(),
metadata: z.record(z.any()).default({})
});
export type Task = z.infer<typeof TaskSchema>;
// State for LangGraph workflow
export interface PriorityState {
tasks: Task[];
currentTask: Task | null;
completedTasks: string[];
messages: BaseMessage[];
metadata: Record<string, any>;
}
Defines strongly-typed priority levels and task structure with Zod validation, ensuring type safety across the entire prioritization system.
2. Implement Priority Queue with Heap
// lib/priority-queue.ts
import { sortBy, remove } from 'es-toolkit';
import { Task, Priority } from '@/types/priority';
export class PriorityQueue {
private tasks: Task[] = [];
private maxSize: number = 10000;
// Add task maintaining heap property
enqueue(task: Task): void {
if (this.tasks.length >= this.maxSize) {
// Remove lowest priority old task
const oldTasks = this.tasks.filter(t =>
t.priority === Priority.BACKGROUND &&
Date.now() - t.createdAt.getTime() > 3600000 // 1 hour
);
if (oldTasks.length > 0) {
remove(this.tasks, t => t.id === oldTasks[0].id);
}
}
this.tasks.push(task);
this.heapifyUp();
}
// Extract highest priority task
dequeue(): Task | null {
if (this.tasks.length === 0) return null;
const task = this.tasks[0];
const last = this.tasks.pop();
if (this.tasks.length > 0 && last) {
this.tasks[0] = last;
this.heapifyDown();
}
return task;
}
private heapifyUp(): void {
let index = this.tasks.length - 1;
while (index > 0) {
const parentIndex = Math.floor((index - 1) / 2);
if (this.compare(this.tasks[index], this.tasks[parentIndex]) >= 0) {
break;
}
[this.tasks[index], this.tasks[parentIndex]] =
[this.tasks[parentIndex], this.tasks[index]];
index = parentIndex;
}
}
private heapifyDown(): void {
let index = 0;
while (2 * index + 1 < this.tasks.length) {
const leftChild = 2 * index + 1;
const rightChild = 2 * index + 2;
let smallest = index;
if (this.compare(this.tasks[leftChild], this.tasks[smallest]) < 0) {
smallest = leftChild;
}
if (rightChild < this.tasks.length &&
this.compare(this.tasks[rightChild], this.tasks[smallest]) < 0) {
smallest = rightChild;
}
if (smallest === index) break;
[this.tasks[index], this.tasks[smallest]] =
[this.tasks[smallest], this.tasks[index]];
index = smallest;
}
}
// Compare tasks for priority ordering
private compare(a: Task, b: Task): number {
// First by priority level
if (a.priority !== b.priority) {
return a.priority - b.priority;
}
// Then by deadline if exists
if (a.deadline && b.deadline) {
return a.deadline.getTime() - b.deadline.getTime();
}
if (a.deadline) return -1;
if (b.deadline) return 1;
// Finally by creation time (FIFO for same priority)
return a.createdAt.getTime() - b.createdAt.getTime();
}
// Get all tasks sorted by priority
getTasks(): Task[] {
return sortBy(this.tasks, [
t => t.priority,
t => t.deadline?.getTime() || Infinity,
t => t.createdAt.getTime()
]);
}
}
Implements an efficient min-heap priority queue with O(log n) operations, automatic cleanup of old low-priority tasks, and multi-criteria comparison.
3. Create Priority Assignment Agent
// lib/agents/priority-agent.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { Priority, Task } from '@/types/priority';
import { chunk } from 'es-toolkit';
export class PriorityAssignmentAgent {
private model: ChatGoogleGenerativeAI;
constructor() {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
maxOutputTokens: 1024,
});
}
async assignPriority(taskDescription: string): Promise<Priority> {
const systemPrompt = `You are a task prioritization expert. Analyze the task and assign a priority level.
Priority levels:
- CRITICAL (0): System failures, security breaches, data loss risks, production down
- HIGH (1): Customer-facing issues, revenue impact, SLA violations
- MEDIUM (2): Important features, non-critical bugs, standard requests
- LOW (3): Nice-to-have features, minor improvements, documentation
- BACKGROUND (4): Cleanup tasks, optimizations, research
Respond with ONLY the priority level name.`;
const response = await this.model.invoke([
new SystemMessage(systemPrompt),
new HumanMessage(`Task: ${taskDescription}`)
]);
const content = response.content.toString().trim().toUpperCase();
// Map response to Priority enum
switch (content) {
case 'CRITICAL': return Priority.CRITICAL;
case 'HIGH': return Priority.HIGH;
case 'MEDIUM': return Priority.MEDIUM;
case 'LOW': return Priority.LOW;
case 'BACKGROUND': return Priority.BACKGROUND;
default: return Priority.MEDIUM; // Safe default
}
}
async batchAssignPriorities(tasks: string[]): Promise<Priority[]> {
// Process in chunks to avoid rate limits
const chunks = chunk(tasks, 5);
const results: Priority[] = [];
for (const batch of chunks) {
const promises = batch.map(task => this.assignPriority(task));
const priorities = await Promise.all(promises);
results.push(...priorities);
}
return results;
}
}
Uses Gemini Flash to intelligently analyze task descriptions and assign appropriate priority levels based on business impact and urgency.
4. Build LangGraph Workflow
// lib/workflows/priority-workflow.ts
import { StateGraph, END } from '@langchain/langgraph';
import { PriorityState, Task, Priority } from '@/types/priority';
import { PriorityQueue } from '@/lib/priority-queue';
import { PriorityAssignmentAgent } from '@/lib/agents/priority-agent';
import { v4 as uuidv4 } from 'uuid';
export function createPriorityWorkflow() {
const queue = new PriorityQueue();
const agent = new PriorityAssignmentAgent();
const workflow = new StateGraph<PriorityState>({
channels: {
tasks: {
value: (x: Task[], y: Task[]) => [...x, ...y],
default: () => [],
},
currentTask: {
value: (x: Task | null, y: Task | null) => y || x,
default: () => null,
},
completedTasks: {
value: (x: string[], y: string[]) => [...x, ...y],
default: () => [],
},
messages: {
value: (x, y) => [...x, ...y],
default: () => [],
},
metadata: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
},
});
// Node: Receive and prioritize new tasks
workflow.addNode('receive', async (state) => {
const { messages } = state;
const lastMessage = messages[messages.length - 1];
if (!lastMessage?.content) {
return { metadata: { error: 'No task description provided' } };
}
const taskDescription = lastMessage.content.toString();
const priority = await agent.assignPriority(taskDescription);
const newTask: Task = {
id: uuidv4(),
description: taskDescription,
priority,
createdAt: new Date(),
dependencies: [],
metadata: {}
};
queue.enqueue(newTask);
return {
tasks: [newTask],
metadata: {
lastAdded: newTask.id,
queueSize: queue.getTasks().length
}
};
});
// Node: Select next task from queue
workflow.addNode('select', async (state) => {
const nextTask = queue.dequeue();
if (!nextTask) {
return {
currentTask: null,
metadata: { status: 'queue_empty' }
};
}
return {
currentTask: nextTask,
metadata: {
status: 'task_selected',
selectedId: nextTask.id,
selectedPriority: Priority[nextTask.priority]
}
};
});
// Node: Process current task
workflow.addNode('process', async (state) => {
const { currentTask } = state;
if (!currentTask) {
return { metadata: { error: 'No task to process' } };
}
// Simulate task processing
await new Promise(resolve => setTimeout(resolve, 100));
return {
completedTasks: [currentTask.id],
currentTask: null,
metadata: {
status: 'task_completed',
completedId: currentTask.id
}
};
});
// Define edges
workflow.addEdge('receive', 'select');
workflow.addEdge('select', 'process');
workflow.addEdge('process', END);
workflow.setEntryPoint('receive');
return workflow.compile();
}
Creates a stateful workflow that receives tasks, assigns priorities, manages queue selection, and processes tasks in priority order.
5. API Route for Priority Queue
// app/api/priority/route.ts
import { createPriorityWorkflow } from '@/lib/workflows/priority-workflow';
import { HumanMessage } from '@langchain/core/messages';
import { NextResponse } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 60;
export async function POST(req: Request) {
try {
const { task } = await req.json();
if (!task) {
return NextResponse.json(
{ error: 'Task description required' },
{ status: 400 }
);
}
const workflow = createPriorityWorkflow();
const result = await workflow.invoke({
tasks: [],
currentTask: null,
completedTasks: [],
messages: [new HumanMessage(task)],
metadata: {}
});
return NextResponse.json({
taskAdded: result.tasks[0],
queueSize: result.metadata.queueSize,
status: result.metadata.status
});
} catch (error) {
console.error('Priority workflow error:', error);
return NextResponse.json(
{ error: 'Failed to process task' },
{ status: 500 }
);
}
}
Exposes the priority workflow as a REST API endpoint that accepts task descriptions and returns priority assignments with queue status.
Advanced Example: Dynamic Multi-Agent Priority System
1. Weighted Scoring with Multiple Criteria
// lib/scoring/weighted-scorer.ts
import { Task } from '@/types/priority';
import { sum, map, zip } from 'es-toolkit';
interface ScoringCriteria {
urgency: number; // 0-10 scale
impact: number; // 0-10 scale
effort: number; // 0-10 scale (inverse)
confidence: number; // 0-10 scale
customerValue: number; // 0-10 scale
}
interface ScoringWeights {
urgency: number;
impact: number;
effort: number;
confidence: number;
customerValue: number;
}
export class WeightedPriorityScorer {
private weights: ScoringWeights;
constructor(weights?: Partial<ScoringWeights>) {
// Default weights sum to 1.0
this.weights = {
urgency: weights?.urgency ?? 0.3,
impact: weights?.impact ?? 0.25,
effort: weights?.effort ?? 0.15,
confidence: weights?.confidence ?? 0.15,
customerValue: weights?.customerValue ?? 0.15,
};
}
// Calculate weighted priority score
calculateScore(criteria: ScoringCriteria): number {
const scores = [
criteria.urgency * this.weights.urgency,
criteria.impact * this.weights.impact,
(10 - criteria.effort) * this.weights.effort, // Invert effort
criteria.confidence * this.weights.confidence,
criteria.customerValue * this.weights.customerValue,
];
return sum(scores);
}
// Extract scoring criteria from task using LLM
async extractCriteria(
task: Task,
model: ChatGoogleGenerativeAI
): Promise<ScoringCriteria> {
const prompt = `Analyze this task and rate each criterion from 0-10:
Task: ${task.description}
Criteria:
- Urgency: How time-sensitive is this? (10 = immediate, 0 = can wait)
- Impact: Business/user impact? (10 = critical, 0 = minimal)
- Effort: Implementation complexity? (10 = very complex, 0 = trivial)
- Confidence: How clear are requirements? (10 = crystal clear, 0 = vague)
- CustomerValue: Direct customer benefit? (10 = high value, 0 = internal only)
Respond in JSON format: {"urgency": X, "impact": Y, ...}`;
const response = await model.invoke([new HumanMessage(prompt)]);
const content = response.content.toString();
try {
const parsed = JSON.parse(content);
return {
urgency: parsed.urgency || 5,
impact: parsed.impact || 5,
effort: parsed.effort || 5,
confidence: parsed.confidence || 5,
customerValue: parsed.customerValue || 5,
};
} catch {
// Default to medium scores on parse error
return {
urgency: 5,
impact: 5,
effort: 5,
confidence: 5,
customerValue: 5,
};
}
}
// Dynamically adjust weights based on context
adjustWeights(context: {
isEmergency?: boolean;
isCustomerFacing?: boolean;
resourcesLimited?: boolean;
}): void {
if (context.isEmergency) {
this.weights.urgency = 0.5;
this.weights.impact = 0.3;
this.normalizeWeights();
}
if (context.isCustomerFacing) {
this.weights.customerValue = 0.3;
this.normalizeWeights();
}
if (context.resourcesLimited) {
this.weights.effort = 0.25;
this.normalizeWeights();
}
}
private normalizeWeights(): void {
const total = sum(Object.values(this.weights));
for (const key in this.weights) {
this.weights[key as keyof ScoringWeights] /= total;
}
}
}
Implements RICE-style weighted scoring with dynamic weight adjustment based on business context and LLM-based criteria extraction.
2. Multi-Agent Priority Router
// lib/agents/priority-router.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { Priority, Task, PriorityState } from '@/types/priority';
import { groupBy, maxBy } from 'es-toolkit';
interface RouterState extends PriorityState {
routedTasks: Record<string, Task[]>;
agentAssignments: Record<string, string>;
}
export class PriorityRouter {
private criticalAgent: ChatGoogleGenerativeAI;
private standardAgent: ChatGoogleGenerativeAI;
private backgroundAgent: ChatGoogleGenerativeAI;
constructor() {
// Different models for different priority levels
this.criticalAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0,
maxOutputTokens: 4096,
});
this.standardAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
maxOutputTokens: 2048,
});
this.backgroundAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.5,
maxOutputTokens: 1024,
});
}
createRouterWorkflow() {
const workflow = new StateGraph<RouterState>({
channels: {
tasks: {
value: (x, y) => [...x, ...y],
default: () => [],
},
routedTasks: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
agentAssignments: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
currentTask: {
value: (x, y) => y || x,
default: () => null,
},
completedTasks: {
value: (x, y) => [...x, ...y],
default: () => [],
},
messages: {
value: (x, y) => [...x, ...y],
default: () => [],
},
metadata: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
},
});
// Route tasks to appropriate queues
workflow.addNode('route', async (state) => {
const { tasks } = state;
// Group tasks by priority
const grouped = groupBy(tasks, t => Priority[t.priority]);
const routed: Record<string, Task[]> = {
critical: grouped['CRITICAL'] || [],
high: grouped['HIGH'] || [],
standard: [...(grouped['MEDIUM'] || []), ...(grouped['LOW'] || [])],
background: grouped['BACKGROUND'] || [],
};
// Assign agents based on priority
const assignments: Record<string, string> = {};
for (const task of tasks) {
if (task.priority === Priority.CRITICAL) {
assignments[task.id] = 'critical-agent';
} else if (task.priority === Priority.HIGH) {
assignments[task.id] = 'high-priority-agent';
} else if (task.priority <= Priority.LOW) {
assignments[task.id] = 'standard-agent';
} else {
assignments[task.id] = 'background-agent';
}
}
return {
routedTasks: routed,
agentAssignments: assignments,
metadata: {
criticalCount: routed.critical.length,
highCount: routed.high.length,
standardCount: routed.standard.length,
backgroundCount: routed.background.length,
}
};
});
// Process critical tasks with dedicated resources
workflow.addNode('process-critical', async (state) => {
const { routedTasks } = state;
const criticalTasks = routedTasks.critical || [];
if (criticalTasks.length === 0) {
return { metadata: { criticalStatus: 'none' } };
}
// Process with high-performance model
const results = await Promise.all(
criticalTasks.map(async task => {
const response = await this.criticalAgent.invoke([
new HumanMessage(`Critical task: ${task.description}`)
]);
return {
taskId: task.id,
result: response.content,
processingTime: Date.now()
};
})
);
return {
completedTasks: criticalTasks.map(t => t.id),
metadata: {
criticalProcessed: results.length,
criticalResults: results
}
};
});
// Process standard tasks in batches
workflow.addNode('process-standard', async (state) => {
const { routedTasks } = state;
const standardTasks = routedTasks.standard || [];
if (standardTasks.length === 0) {
return { metadata: { standardStatus: 'none' } };
}
// Batch process for efficiency
const batchSize = 5;
const completed: string[] = [];
for (let i = 0; i < standardTasks.length; i += batchSize) {
const batch = standardTasks.slice(i, i + batchSize);
const batchPromises = batch.map(task =>
this.standardAgent.invoke([
new HumanMessage(`Task: ${task.description}`)
])
);
await Promise.all(batchPromises);
completed.push(...batch.map(t => t.id));
}
return {
completedTasks: completed,
metadata: { standardProcessed: completed.length }
};
});
// Schedule background tasks
workflow.addNode('schedule-background', async (state) => {
const { routedTasks } = state;
const backgroundTasks = routedTasks.background || [];
if (backgroundTasks.length === 0) {
return { metadata: { backgroundStatus: 'none' } };
}
// Queue for off-peak processing
const scheduled = backgroundTasks.map(task => ({
taskId: task.id,
scheduledFor: new Date(Date.now() + 3600000), // 1 hour later
agent: 'background-agent'
}));
return {
metadata: {
backgroundScheduled: scheduled.length,
scheduledTasks: scheduled
}
};
});
// Conditional routing based on task presence
workflow.addConditionalEdges('route', (state) => {
const { routedTasks } = state;
const edges = [];
if (routedTasks.critical?.length > 0) edges.push('process-critical');
if (routedTasks.standard?.length > 0) edges.push('process-standard');
if (routedTasks.background?.length > 0) edges.push('schedule-background');
return edges.length > 0 ? edges : [END];
});
workflow.addEdge('process-critical', END);
workflow.addEdge('process-standard', END);
workflow.addEdge('schedule-background', END);
workflow.setEntryPoint('route');
return workflow.compile();
}
}
Implements sophisticated multi-agent routing with different models and processing strategies for each priority level.
3. Dynamic Priority Adjustment System
// lib/priority/dynamic-adjuster.ts
import { Task, Priority } from '@/types/priority';
import { mean, standardDeviation } from 'es-toolkit';
import { kv } from '@vercel/kv';
interface TaskMetrics {
taskId: string;
attempts: number;
failureRate: number;
averageProcessingTime: number;
lastAttempt: Date;
ageInHours: number;
}
export class DynamicPriorityAdjuster {
private readonly AGING_FACTOR = 0.1; // Priority boost per hour
private readonly FAILURE_THRESHOLD = 0.3; // 30% failure rate triggers boost
private readonly STARVATION_HOURS = 4; // Hours before anti-starvation kicks in
async adjustPriority(task: Task, metrics: TaskMetrics): Promise<Priority> {
let priorityScore = task.priority;
// Age-based adjustment (prevent starvation)
if (metrics.ageInHours > this.STARVATION_HOURS) {
const ageBoost = Math.floor(
(metrics.ageInHours - this.STARVATION_HOURS) * this.AGING_FACTOR
);
priorityScore = Math.max(0, priorityScore - ageBoost);
}
// Failure rate adjustment
if (metrics.failureRate > this.FAILURE_THRESHOLD) {
priorityScore = Math.max(0, priorityScore - 1);
}
// Deadline proximity adjustment
if (task.deadline) {
const hoursUntilDeadline =
(task.deadline.getTime() - Date.now()) / 3600000;
if (hoursUntilDeadline < 24) {
priorityScore = Priority.HIGH;
}
if (hoursUntilDeadline < 4) {
priorityScore = Priority.CRITICAL;
}
}
// Store adjustment history
await this.storeAdjustment(task.id, task.priority, priorityScore);
return priorityScore as Priority;
}
async calculateSystemLoad(): Promise<{
load: 'low' | 'normal' | 'high' | 'critical';
metrics: Record<string, number>;
}> {
// Fetch recent task metrics from KV store
const recentTasks = await kv.lrange('task-queue', 0, 100);
if (!recentTasks || recentTasks.length === 0) {
return { load: 'low', metrics: {} };
}
const priorities = recentTasks.map((t: any) => t.priority);
const criticalCount = priorities.filter(p => p === Priority.CRITICAL).length;
const highCount = priorities.filter(p => p === Priority.HIGH).length;
const avgPriority = mean(priorities);
const stdDev = standardDeviation(priorities);
// Determine system load
let load: 'low' | 'normal' | 'high' | 'critical';
if (criticalCount > 10 || avgPriority < 1) {
load = 'critical';
} else if (highCount > 20 || avgPriority < 2) {
load = 'high';
} else if (avgPriority < 3) {
load = 'normal';
} else {
load = 'low';
}
return {
load,
metrics: {
averagePriority: avgPriority,
standardDeviation: stdDev,
criticalTasks: criticalCount,
highTasks: highCount,
totalTasks: recentTasks.length,
}
};
}
async optimizeThroughput(tasks: Task[]): Promise<Task[]> {
const systemLoad = await this.calculateSystemLoad();
// Apply different strategies based on load
switch (systemLoad.load) {
case 'critical':
// Focus only on critical tasks
return tasks
.filter(t => t.priority <= Priority.HIGH)
.sort((a, b) => a.priority - b.priority);
case 'high':
// Balance critical and high priority
return tasks
.filter(t => t.priority <= Priority.MEDIUM)
.sort((a, b) => {
// Interleave high priority with some medium
if (a.priority === b.priority) return 0;
if (a.priority === Priority.CRITICAL) return -1;
if (b.priority === Priority.CRITICAL) return 1;
// Mix high and medium at 3:1 ratio
return Math.random() > 0.25 ?
a.priority - b.priority :
b.priority - a.priority;
});
case 'normal':
// Standard priority ordering
return tasks.sort((a, b) => a.priority - b.priority);
case 'low':
// Process all tasks, boost background work
return tasks.sort((a, b) => {
// Give background tasks a chance
if (a.priority === Priority.BACKGROUND &&
b.priority === Priority.LOW) {
return Math.random() > 0.7 ? -1 : 1;
}
return a.priority - b.priority;
});
}
}
private async storeAdjustment(
taskId: string,
originalPriority: Priority,
adjustedPriority: Priority
): Promise<void> {
const adjustment = {
taskId,
originalPriority: Priority[originalPriority],
adjustedPriority: Priority[adjustedPriority],
timestamp: new Date().toISOString(),
reason: this.determineAdjustmentReason(originalPriority, adjustedPriority)
};
await kv.lpush('priority-adjustments', adjustment);
await kv.ltrim('priority-adjustments', 0, 999); // Keep last 1000
}
private determineAdjustmentReason(
original: Priority,
adjusted: Priority
): string {
if (adjusted < original) {
return 'priority_increased';
} else if (adjusted > original) {
return 'priority_decreased';
}
return 'no_change';
}
}
Implements dynamic priority adjustment with aging prevention, failure-based boosting, and load-aware throughput optimization.
4. Streaming Priority Updates
// app/api/priority/stream/route.ts
import { DynamicPriorityAdjuster } from '@/lib/priority/dynamic-adjuster';
import { PriorityRouter } from '@/lib/agents/priority-router';
import { Task } from '@/types/priority';
export const runtime = 'nodejs';
export const maxDuration = 300;
export async function GET(req: Request) {
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const adjuster = new DynamicPriorityAdjuster();
const router = new PriorityRouter();
const workflow = router.createRouterWorkflow();
// Stream priority updates every second
const interval = setInterval(async () => {
try {
const systemLoad = await adjuster.calculateSystemLoad();
const update = {
timestamp: new Date().toISOString(),
load: systemLoad.load,
metrics: systemLoad.metrics,
recommendation: getRecommendation(systemLoad.load)
};
await writer.write(
encoder.encode(`data: ${JSON.stringify(update)}\n\n`)
);
} catch (error) {
console.error('Stream error:', error);
}
}, 1000);
// Clean up on client disconnect
req.signal.addEventListener('abort', () => {
clearInterval(interval);
writer.close();
});
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
function getRecommendation(load: string): string {
switch (load) {
case 'critical':
return 'System overloaded. Consider scaling agents or deferring low-priority tasks.';
case 'high':
return 'High load detected. Monitor closely and prepare to scale.';
case 'normal':
return 'System operating normally.';
case 'low':
return 'Low utilization. Good time for maintenance or background tasks.';
default:
return 'Unknown system state.';
}
}
Provides real-time priority system monitoring with SSE streaming, enabling dashboards to display live system load and recommendations.
5. Frontend Priority Dashboard
// components/PriorityDashboard.tsx
'use client';
import { useQuery, useMutation } from '@tanstack/react-query';
import { useState, useEffect } from 'react';
import { Task, Priority } from '@/types/priority';
import { groupBy, sortBy } from 'es-toolkit';
export default function PriorityDashboard() {
const [tasks, setTasks] = useState<Task[]>([]);
const [systemLoad, setSystemLoad] = useState<any>(null);
const [newTask, setNewTask] = useState('');
// Submit new task
const submitTask = useMutation({
mutationFn: async (description: string) => {
const res = await fetch('/api/priority', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ task: description }),
});
return res.json();
},
onSuccess: (data) => {
if (data.taskAdded) {
setTasks(prev => [...prev, data.taskAdded]);
}
setNewTask('');
},
});
// Stream system updates
useEffect(() => {
const eventSource = new EventSource('/api/priority/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setSystemLoad(data);
};
return () => eventSource.close();
}, []);
// Group tasks by priority
const groupedTasks = groupBy(tasks, t => Priority[t.priority]);
return (
<div className="p-6 max-w-6xl mx-auto">
<div className="mb-8">
<h1 className="text-3xl font-bold mb-4">Priority Management System</h1>
{/* System Load Indicator */}
{systemLoad && (
<div className={`alert ${
systemLoad.load === 'critical' ? 'alert-error' :
systemLoad.load === 'high' ? 'alert-warning' :
'alert-info'
} mb-4`}>
<div>
<h3 className="font-bold">System Load: {systemLoad.load.toUpperCase()}</h3>
<p className="text-sm">{systemLoad.recommendation}</p>
<div className="stats stats-horizontal mt-2 bg-transparent">
<div className="stat px-2">
<div className="stat-title text-xs">Critical</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.criticalTasks || 0}
</div>
</div>
<div className="stat px-2">
<div className="stat-title text-xs">High</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.highTasks || 0}
</div>
</div>
<div className="stat px-2">
<div className="stat-title text-xs">Total</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.totalTasks || 0}
</div>
</div>
</div>
</div>
</div>
)}
{/* Task Input */}
<div className="form-control mb-6">
<div className="input-group">
<input
type="text"
placeholder="Describe a new task..."
className="input input-bordered flex-1"
value={newTask}
onChange={(e) => setNewTask(e.target.value)}
onKeyPress={(e) => {
if (e.key === 'Enter' && newTask.trim()) {
submitTask.mutate(newTask);
}
}}
/>
<button
className="btn btn-primary"
onClick={() => newTask.trim() && submitTask.mutate(newTask)}
disabled={submitTask.isPending || !newTask.trim()}
>
{submitTask.isPending ? (
<span className="loading loading-spinner"></span>
) : (
'Add Task'
)}
</button>
</div>
</div>
{/* Priority Lanes */}
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-4">
{Object.entries(Priority)
.filter(([key]) => isNaN(Number(key)))
.map(([priorityName, priorityValue]) => (
<div key={priorityName} className="card bg-base-100 shadow-xl">
<div className="card-body p-4">
<h2 className={`card-title text-sm ${
priorityName === 'CRITICAL' ? 'text-error' :
priorityName === 'HIGH' ? 'text-warning' :
priorityName === 'MEDIUM' ? 'text-info' :
'text-base-content'
}`}>
{priorityName}
<div className="badge badge-sm">
{groupedTasks[priorityName]?.length || 0}
</div>
</h2>
<div className="space-y-2 mt-2">
{(groupedTasks[priorityName] || [])
.slice(0, 5)
.map(task => (
<div
key={task.id}
className="text-xs p-2 bg-base-200 rounded"
>
<div className="font-medium truncate">
{task.description}
</div>
{task.deadline && (
<div className="text-xs opacity-70 mt-1">
Due: {new Date(task.deadline).toLocaleDateString()}
</div>
)}
</div>
))}
{groupedTasks[priorityName]?.length > 5 && (
<div className="text-xs opacity-50 text-center">
+{groupedTasks[priorityName].length - 5} more
</div>
)}
</div>
</div>
</div>
))}
</div>
</div>
</div>
);
}
Creates an interactive dashboard showing real-time priority lanes, system load indicators, and task submission with visual feedback.
Conclusion
Prioritization in agentic design patterns transforms how AI systems handle complex workloads by introducing intelligent task management that adapts to real-world constraints. Through the combination of heap-based priority queues, LLM-powered priority assignment, and dynamic adjustment algorithms, we've created systems that mirror human decision-making while operating at machine scale. The basic implementation provides efficient O(log n) operations with multi-criteria sorting, while the advanced example demonstrates production-ready features including weighted scoring, multi-agent routing, and real-time load monitoring.
These patterns excel in serverless environments like Vercel, where stateless execution and rapid scaling are essential. By leveraging LangGraph's stateful workflows with es-toolkit's optimized utilities, the system maintains performance even under high load while providing the flexibility to adjust priorities based on changing business conditions. The key insight is that effective prioritization isn't just about ordering tasks - it's about creating adaptive systems that balance urgency, impact, and resource availability to maximize overall system effectiveness.