초안 에이전트 설계 패턴 - 우선순위 지정
TypeScript로 LangChain과 LangGraph를 사용하여 Vercel의 서버리스 플랫폼에 최적화된 지능형 작업 우선순위 시스템을 구축하는 방법을 배웁니다. 동적 우선순위 큐, 가중치 점수 알고리즘, 프로덕션 AI 에이전트를 위한 적응형 작업 관리 구현을 보여드립니다.
멘탈 모델: 응급실 트리아지 시스템
에이전트 우선순위 지정을 응급실 트리아지 시스템처럼 생각해보세요. 의료진이 중증도, 긴급성, 가용 자원을 기반으로 들어오는 환자를 평가하는 것처럼, AI 에이전트도 여러 기준을 사용해 작업을 평가해야 합니다. 병원에서 가슴 통증을 호소하는 환자가 경미한 상처를 입은 환자보다 우선 치료를 받는 것처럼, 에이전트는 중요한 고객 문제를 일상적인 문의보다 우선시합니다. 트리아지 간호사(우선순위 알고리즘)는 새로운 환자가 도착할 때마다 지속적으로 재평가하며(동적 재우선순위 지정), 전문 팀이 다양한 심각도 수준을 처리합니다(우선순위 기반 라우팅). 이 멘탈 모델은 단순 FIFO 큐가 복잡한 시나리오에서 실패하는 이유와 지능형 우선순위 지정이 시스템 효율성을 극적으로 향상시키는 방법을 이해하는 데 도움이 됩니다.
기본 예제: LangGraph를 사용한 작업 우선순위 큐
1. 우선순위 상태와 타입 정의
// types/priority.ts
import { z } from 'zod';
import { BaseMessage } from '@langchain/core/messages';
// 타입 안정성을 위한 enum 사용 우선순위 레벨
export enum Priority {
CRITICAL = 0, // 최고 우선순위
HIGH = 1,
MEDIUM = 2,
LOW = 3,
BACKGROUND = 4 // 최저 우선순위
}
// Zod 유효성 검사를 사용한 작업 스키마
export const TaskSchema = z.object({
id: z.string(),
description: z.string(),
priority: z.nativeEnum(Priority),
deadline: z.date().optional(),
dependencies: z.array(z.string()).default([]),
assignee: z.string().optional(),
createdAt: z.date(),
metadata: z.record(z.any()).default({})
});
export type Task = z.infer<typeof TaskSchema>;
// LangGraph 워크플로우를 위한 상태
export interface PriorityState {
tasks: Task[];
currentTask: Task | null;
completedTasks: string[];
messages: BaseMessage[];
metadata: Record<string, any>;
}
Zod 유효성 검사로 강력하게 타입화된 우선순위 레벨과 작업 구조를 정의하여 전체 우선순위 시스템에서 타입 안정성을 보장합니다.
2. 힙을 사용한 우선순위 큐 구현
// lib/priority-queue.ts
import { sortBy, remove } from 'es-toolkit';
import { Task, Priority } from '@/types/priority';
export class PriorityQueue {
private tasks: Task[] = [];
private maxSize: number = 10000;
// 힙 속성을 유지하면서 작업 추가
enqueue(task: Task): void {
if (this.tasks.length >= this.maxSize) {
// 우선순위가 낮은 오래된 작업 제거
const oldTasks = this.tasks.filter(t =>
t.priority === Priority.BACKGROUND &&
Date.now() - t.createdAt.getTime() > 3600000 // 1시간
);
if (oldTasks.length > 0) {
remove(this.tasks, t => t.id === oldTasks[0].id);
}
}
this.tasks.push(task);
this.heapifyUp();
}
// 최고 우선순위 작업 추출
dequeue(): Task | null {
if (this.tasks.length === 0) return null;
const task = this.tasks[0];
const last = this.tasks.pop();
if (this.tasks.length > 0 && last) {
this.tasks[0] = last;
this.heapifyDown();
}
return task;
}
private heapifyUp(): void {
let index = this.tasks.length - 1;
while (index > 0) {
const parentIndex = Math.floor((index - 1) / 2);
if (this.compare(this.tasks[index], this.tasks[parentIndex]) >= 0) {
break;
}
[this.tasks[index], this.tasks[parentIndex]] =
[this.tasks[parentIndex], this.tasks[index]];
index = parentIndex;
}
}
private heapifyDown(): void {
let index = 0;
while (2 * index + 1 < this.tasks.length) {
const leftChild = 2 * index + 1;
const rightChild = 2 * index + 2;
let smallest = index;
if (this.compare(this.tasks[leftChild], this.tasks[smallest]) < 0) {
smallest = leftChild;
}
if (rightChild < this.tasks.length &&
this.compare(this.tasks[rightChild], this.tasks[smallest]) < 0) {
smallest = rightChild;
}
if (smallest === index) break;
[this.tasks[index], this.tasks[smallest]] =
[this.tasks[smallest], this.tasks[index]];
index = smallest;
}
}
// 우선순위 정렬을 위한 작업 비교
private compare(a: Task, b: Task): number {
// 먼저 우선순위 레벨로 비교
if (a.priority !== b.priority) {
return a.priority - b.priority;
}
// 다음 마감일로 비교(존재하는 경우)
if (a.deadline && b.deadline) {
return a.deadline.getTime() - b.deadline.getTime();
}
if (a.deadline) return -1;
if (b.deadline) return 1;
// 마지막으로 생성 시간으로 비교(같은 우선순위의 경우 FIFO)
return a.createdAt.getTime() - b.createdAt.getTime();
}
// 우선순위순으로 정렬된 모든 작업 가져오기
getTasks(): Task[] {
return sortBy(this.tasks, [
t => t.priority,
t => t.deadline?.getTime() || Infinity,
t => t.createdAt.getTime()
]);
}
}
O(log n) 연산의 효율적인 최소 힙 우선순위 큐를 구현하며, 오래된 낮은 우선순위 작업의 자동 정리와 다중 기준 비교를 수행합니다.
3. 우선순위 할당 에이전트 생성
// lib/agents/priority-agent.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { Priority, Task } from '@/types/priority';
import { chunk } from 'es-toolkit';
export class PriorityAssignmentAgent {
private model: ChatGoogleGenerativeAI;
constructor() {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
maxOutputTokens: 1024,
});
}
async assignPriority(taskDescription: string): Promise<Priority> {
const systemPrompt = `당신은 작업 우선순위 지정 전문가입니다. 작업을 분석하고 우선순위 레벨을 할당하세요.
우선순위 레벨:
- CRITICAL (0): 시스템 장애, 보안 침해, 데이터 손실 위험, 프로덕션 다운
- HIGH (1): 고객 대면 문제, 수익 영향, SLA 위반
- MEDIUM (2): 중요한 기능, 중요하지 않은 버그, 표준 요청
- LOW (3): 있으면 좋은 기능, 사소한 개선, 문서화
- BACKGROUND (4): 정리 작업, 최적화, 연구
우선순위 레벨 이름만으로 응답하세요.`;
const response = await this.model.invoke([
new SystemMessage(systemPrompt),
new HumanMessage(`작업: ${taskDescription}`)
]);
const content = response.content.toString().trim().toUpperCase();
// 응답을 Priority enum에 매핑
switch (content) {
case 'CRITICAL': return Priority.CRITICAL;
case 'HIGH': return Priority.HIGH;
case 'MEDIUM': return Priority.MEDIUM;
case 'LOW': return Priority.LOW;
case 'BACKGROUND': return Priority.BACKGROUND;
default: return Priority.MEDIUM; // 안전한 기본값
}
}
async batchAssignPriorities(tasks: string[]): Promise<Priority[]> {
// 속도 제한을 피하기 위해 청크로 처리
const chunks = chunk(tasks, 5);
const results: Priority[] = [];
for (const batch of chunks) {
const promises = batch.map(task => this.assignPriority(task));
const priorities = await Promise.all(promises);
results.push(...priorities);
}
return results;
}
}
Gemini Flash를 사용하여 작업 설명을 지능적으로 분석하고 비즈니스 영향과 긴급성에 따라 적절한 우선순위 레벨을 할당합니다.
4. LangGraph 워크플로우 구축
// lib/workflows/priority-workflow.ts
import { StateGraph, END } from '@langchain/langgraph';
import { PriorityState, Task, Priority } from '@/types/priority';
import { PriorityQueue } from '@/lib/priority-queue';
import { PriorityAssignmentAgent } from '@/lib/agents/priority-agent';
import { v4 as uuidv4 } from 'uuid';
export function createPriorityWorkflow() {
const queue = new PriorityQueue();
const agent = new PriorityAssignmentAgent();
const workflow = new StateGraph<PriorityState>({
channels: {
tasks: {
value: (x: Task[], y: Task[]) => [...x, ...y],
default: () => [],
},
currentTask: {
value: (x: Task | null, y: Task | null) => y || x,
default: () => null,
},
completedTasks: {
value: (x: string[], y: string[]) => [...x, ...y],
default: () => [],
},
messages: {
value: (x, y) => [...x, ...y],
default: () => [],
},
metadata: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
},
});
// 노드: 새 작업을 받고 우선순위 지정
workflow.addNode('receive', async (state) => {
const { messages } = state;
const lastMessage = messages[messages.length - 1];
if (!lastMessage?.content) {
return { metadata: { error: '작업 설명이 제공되지 않았습니다' } };
}
const taskDescription = lastMessage.content.toString();
const priority = await agent.assignPriority(taskDescription);
const newTask: Task = {
id: uuidv4(),
description: taskDescription,
priority,
createdAt: new Date(),
dependencies: [],
metadata: {}
};
queue.enqueue(newTask);
return {
tasks: [newTask],
metadata: {
lastAdded: newTask.id,
queueSize: queue.getTasks().length
}
};
});
// 노드: 큐에서 다음 작업 선택
workflow.addNode('select', async (state) => {
const nextTask = queue.dequeue();
if (!nextTask) {
return {
currentTask: null,
metadata: { status: 'queue_empty' }
};
}
return {
currentTask: nextTask,
metadata: {
status: 'task_selected',
selectedId: nextTask.id,
selectedPriority: Priority[nextTask.priority]
}
};
});
// 노드: 현재 작업 처리
workflow.addNode('process', async (state) => {
const { currentTask } = state;
if (!currentTask) {
return { metadata: { error: '처리할 작업이 없습니다' } };
}
// 작업 처리 시뮬레이션
await new Promise(resolve => setTimeout(resolve, 100));
return {
completedTasks: [currentTask.id],
currentTask: null,
metadata: {
status: 'task_completed',
completedId: currentTask.id
}
};
});
// 엣지 정의
workflow.addEdge('receive', 'select');
workflow.addEdge('select', 'process');
workflow.addEdge('process', END);
workflow.setEntryPoint('receive');
return workflow.compile();
}
작업을 받고, 우선순위를 할당하고, 큐 선택을 관리하며, 우선순위 순서대로 작업을 처리하는 상태 저장 워크플로우를 생성합니다.
5. 우선순위 큐용 API 라우트
// app/api/priority/route.ts
import { createPriorityWorkflow } from '@/lib/workflows/priority-workflow';
import { HumanMessage } from '@langchain/core/messages';
import { NextResponse } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 60;
export async function POST(req: Request) {
try {
const { task } = await req.json();
if (!task) {
return NextResponse.json(
{ error: '작업 설명이 필요합니다' },
{ status: 400 }
);
}
const workflow = createPriorityWorkflow();
const result = await workflow.invoke({
tasks: [],
currentTask: null,
completedTasks: [],
messages: [new HumanMessage(task)],
metadata: {}
});
return NextResponse.json({
taskAdded: result.tasks[0],
queueSize: result.metadata.queueSize,
status: result.metadata.status
});
} catch (error) {
console.error('우선순위 워크플로우 오류:', error);
return NextResponse.json(
{ error: '작업 처리 실패' },
{ status: 500 }
);
}
}
작업 설명을 받아들이고 큐 상태와 함께 우선순위 할당을 반환하는 REST API 엔드포인트로 우선순위 워크플로우를 노출합니다.
고급 예제: 동적 다중 에이전트 우선순위 시스템
1. 여러 기준을 사용한 가중치 점수
// lib/scoring/weighted-scorer.ts
import { Task } from '@/types/priority';
import { sum, map, zip } from 'es-toolkit';
interface ScoringCriteria {
urgency: number; // 0-10 척도
impact: number; // 0-10 척도
effort: number; // 0-10 척도 (역방향)
confidence: number; // 0-10 척도
customerValue: number; // 0-10 척도
}
interface ScoringWeights {
urgency: number;
impact: number;
effort: number;
confidence: number;
customerValue: number;
}
export class WeightedPriorityScorer {
private weights: ScoringWeights;
constructor(weights?: Partial<ScoringWeights>) {
// 기본 가중치 합계는 1.0
this.weights = {
urgency: weights?.urgency ?? 0.3,
impact: weights?.impact ?? 0.25,
effort: weights?.effort ?? 0.15,
confidence: weights?.confidence ?? 0.15,
customerValue: weights?.customerValue ?? 0.15,
};
}
// 가중치 우선순위 점수 계산
calculateScore(criteria: ScoringCriteria): number {
const scores = [
criteria.urgency * this.weights.urgency,
criteria.impact * this.weights.impact,
(10 - criteria.effort) * this.weights.effort, // 노력 반전
criteria.confidence * this.weights.confidence,
criteria.customerValue * this.weights.customerValue,
];
return sum(scores);
}
// LLM을 사용하여 작업에서 점수 기준 추출
async extractCriteria(
task: Task,
model: ChatGoogleGenerativeAI
): Promise<ScoringCriteria> {
const prompt = `이 작업을 분석하고 각 기준을 0-10으로 평가하세요:
작업: ${task.description}
기준:
- 긴급성: 얼마나 시간에 민감한가? (10 = 즉시, 0 = 기다릴 수 있음)
- 영향: 비즈니스/사용자 영향? (10 = 중대, 0 = 최소)
- 노력: 구현 복잡성? (10 = 매우 복잡, 0 = 간단)
- 신뢰도: 요구사항이 얼마나 명확한가? (10 = 매우 명확, 0 = 모호)
- 고객가치: 직접적인 고객 이익? (10 = 높은 가치, 0 = 내부용만)
JSON 형식으로 응답: {"urgency": X, "impact": Y, ...}`;
const response = await model.invoke([new HumanMessage(prompt)]);
const content = response.content.toString();
try {
const parsed = JSON.parse(content);
return {
urgency: parsed.urgency || 5,
impact: parsed.impact || 5,
effort: parsed.effort || 5,
confidence: parsed.confidence || 5,
customerValue: parsed.customerValue || 5,
};
} catch {
// 파싱 오류 시 기본 중간 점수
return {
urgency: 5,
impact: 5,
effort: 5,
confidence: 5,
customerValue: 5,
};
}
}
// 컨텍스트를 기반으로 가중치 동적 조정
adjustWeights(context: {
isEmergency?: boolean;
isCustomerFacing?: boolean;
resourcesLimited?: boolean;
}): void {
if (context.isEmergency) {
this.weights.urgency = 0.5;
this.weights.impact = 0.3;
this.normalizeWeights();
}
if (context.isCustomerFacing) {
this.weights.customerValue = 0.3;
this.normalizeWeights();
}
if (context.resourcesLimited) {
this.weights.effort = 0.25;
this.normalizeWeights();
}
}
private normalizeWeights(): void {
const total = sum(Object.values(this.weights));
for (const key in this.weights) {
this.weights[key as keyof ScoringWeights] /= total;
}
}
}
비즈니스 컨텍스트와 LLM 기반 기준 추출을 기반으로 한 동적 가중치 조정과 함께 RICE 스타일 가중치 점수를 구현합니다.
2. 다중 에이전트 우선순위 라우터
// lib/agents/priority-router.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { Priority, Task, PriorityState } from '@/types/priority';
import { groupBy, maxBy } from 'es-toolkit';
interface RouterState extends PriorityState {
routedTasks: Record<string, Task[]>;
agentAssignments: Record<string, string>;
}
export class PriorityRouter {
private criticalAgent: ChatGoogleGenerativeAI;
private standardAgent: ChatGoogleGenerativeAI;
private backgroundAgent: ChatGoogleGenerativeAI;
constructor() {
// 다른 우선순위 레벨에 다른 모델
this.criticalAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0,
maxOutputTokens: 4096,
});
this.standardAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
maxOutputTokens: 2048,
});
this.backgroundAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.5,
maxOutputTokens: 1024,
});
}
createRouterWorkflow() {
const workflow = new StateGraph<RouterState>({
channels: {
tasks: {
value: (x, y) => [...x, ...y],
default: () => [],
},
routedTasks: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
agentAssignments: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
currentTask: {
value: (x, y) => y || x,
default: () => null,
},
completedTasks: {
value: (x, y) => [...x, ...y],
default: () => [],
},
messages: {
value: (x, y) => [...x, ...y],
default: () => [],
},
metadata: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
},
});
// 작업을 적절한 큐로 라우팅
workflow.addNode('route', async (state) => {
const { tasks } = state;
// 우선순위별로 작업 그룹화
const grouped = groupBy(tasks, t => Priority[t.priority]);
const routed: Record<string, Task[]> = {
critical: grouped['CRITICAL'] || [],
high: grouped['HIGH'] || [],
standard: [...(grouped['MEDIUM'] || []), ...(grouped['LOW'] || [])],
background: grouped['BACKGROUND'] || [],
};
// 우선순위에 따라 에이전트 할당
const assignments: Record<string, string> = {};
for (const task of tasks) {
if (task.priority === Priority.CRITICAL) {
assignments[task.id] = 'critical-agent';
} else if (task.priority === Priority.HIGH) {
assignments[task.id] = 'high-priority-agent';
} else if (task.priority <= Priority.LOW) {
assignments[task.id] = 'standard-agent';
} else {
assignments[task.id] = 'background-agent';
}
}
return {
routedTasks: routed,
agentAssignments: assignments,
metadata: {
criticalCount: routed.critical.length,
highCount: routed.high.length,
standardCount: routed.standard.length,
backgroundCount: routed.background.length,
}
};
});
// 전용 리소스로 중요한 작업 처리
workflow.addNode('process-critical', async (state) => {
const { routedTasks } = state;
const criticalTasks = routedTasks.critical || [];
if (criticalTasks.length === 0) {
return { metadata: { criticalStatus: 'none' } };
}
// 고성능 모델로 처리
const results = await Promise.all(
criticalTasks.map(async task => {
const response = await this.criticalAgent.invoke([
new HumanMessage(`중요 작업: ${task.description}`)
]);
return {
taskId: task.id,
result: response.content,
processingTime: Date.now()
};
})
);
return {
completedTasks: criticalTasks.map(t => t.id),
metadata: {
criticalProcessed: results.length,
criticalResults: results
}
};
});
// 표준 작업을 배치로 처리
workflow.addNode('process-standard', async (state) => {
const { routedTasks } = state;
const standardTasks = routedTasks.standard || [];
if (standardTasks.length === 0) {
return { metadata: { standardStatus: 'none' } };
}
// 효율성을 위한 배치 처리
const batchSize = 5;
const completed: string[] = [];
for (let i = 0; i < standardTasks.length; i += batchSize) {
const batch = standardTasks.slice(i, i + batchSize);
const batchPromises = batch.map(task =>
this.standardAgent.invoke([
new HumanMessage(`작업: ${task.description}`)
])
);
await Promise.all(batchPromises);
completed.push(...batch.map(t => t.id));
}
return {
completedTasks: completed,
metadata: { standardProcessed: completed.length }
};
});
// 백그라운드 작업 스케줄링
workflow.addNode('schedule-background', async (state) => {
const { routedTasks } = state;
const backgroundTasks = routedTasks.background || [];
if (backgroundTasks.length === 0) {
return { metadata: { backgroundStatus: 'none' } };
}
// 오프피크 처리를 위한 큐잉
const scheduled = backgroundTasks.map(task => ({
taskId: task.id,
scheduledFor: new Date(Date.now() + 3600000), // 1시간 후
agent: 'background-agent'
}));
return {
metadata: {
backgroundScheduled: scheduled.length,
scheduledTasks: scheduled
}
};
});
// 작업 존재에 기반한 조건부 라우팅
workflow.addConditionalEdges('route', (state) => {
const { routedTasks } = state;
const edges = [];
if (routedTasks.critical?.length > 0) edges.push('process-critical');
if (routedTasks.standard?.length > 0) edges.push('process-standard');
if (routedTasks.background?.length > 0) edges.push('schedule-background');
return edges.length > 0 ? edges : [END];
});
workflow.addEdge('process-critical', END);
workflow.addEdge('process-standard', END);
workflow.addEdge('schedule-background', END);
workflow.setEntryPoint('route');
return workflow.compile();
}
}
각 우선순위 레벨에 대해 다른 모델과 처리 전략을 사용하는 정교한 다중 에이전트 라우팅을 구현합니다.
3. 동적 우선순위 조정 시스템
// lib/priority/dynamic-adjuster.ts
import { Task, Priority } from '@/types/priority';
import { mean, standardDeviation } from 'es-toolkit';
import { kv } from '@vercel/kv';
interface TaskMetrics {
taskId: string;
attempts: number;
failureRate: number;
averageProcessingTime: number;
lastAttempt: Date;
ageInHours: number;
}
export class DynamicPriorityAdjuster {
private readonly AGING_FACTOR = 0.1; // 시간당 우선순위 부스트
private readonly FAILURE_THRESHOLD = 0.3; // 30% 실패율이 부스트 트리거
private readonly STARVATION_HOURS = 4; // 기아 방지가 시작되기 전 시간
async adjustPriority(task: Task, metrics: TaskMetrics): Promise<Priority> {
let priorityScore = task.priority;
// 나이 기반 조정 (기아 방지)
if (metrics.ageInHours > this.STARVATION_HOURS) {
const ageBoost = Math.floor(
(metrics.ageInHours - this.STARVATION_HOURS) * this.AGING_FACTOR
);
priorityScore = Math.max(0, priorityScore - ageBoost);
}
// 실패율 조정
if (metrics.failureRate > this.FAILURE_THRESHOLD) {
priorityScore = Math.max(0, priorityScore - 1);
}
// 마감일 근접성 조정
if (task.deadline) {
const hoursUntilDeadline =
(task.deadline.getTime() - Date.now()) / 3600000;
if (hoursUntilDeadline < 24) {
priorityScore = Priority.HIGH;
}
if (hoursUntilDeadline < 4) {
priorityScore = Priority.CRITICAL;
}
}
// 조정 이력 저장
await this.storeAdjustment(task.id, task.priority, priorityScore);
return priorityScore as Priority;
}
async calculateSystemLoad(): Promise<{
load: 'low' | 'normal' | 'high' | 'critical';
metrics: Record<string, number>;
}> {
// KV 스토어에서 최근 작업 메트릭 가져오기
const recentTasks = await kv.lrange('task-queue', 0, 100);
if (!recentTasks || recentTasks.length === 0) {
return { load: 'low', metrics: {} };
}
const priorities = recentTasks.map((t: any) => t.priority);
const criticalCount = priorities.filter(p => p === Priority.CRITICAL).length;
const highCount = priorities.filter(p => p === Priority.HIGH).length;
const avgPriority = mean(priorities);
const stdDev = standardDeviation(priorities);
// 시스템 부하 결정
let load: 'low' | 'normal' | 'high' | 'critical';
if (criticalCount > 10 || avgPriority < 1) {
load = 'critical';
} else if (highCount > 20 || avgPriority < 2) {
load = 'high';
} else if (avgPriority < 3) {
load = 'normal';
} else {
load = 'low';
}
return {
load,
metrics: {
averagePriority: avgPriority,
standardDeviation: stdDev,
criticalTasks: criticalCount,
highTasks: highCount,
totalTasks: recentTasks.length,
}
};
}
async optimizeThroughput(tasks: Task[]): Promise<Task[]> {
const systemLoad = await this.calculateSystemLoad();
// 부하에 따라 다른 전략 적용
switch (systemLoad.load) {
case 'critical':
// 중요한 작업에만 집중
return tasks
.filter(t => t.priority <= Priority.HIGH)
.sort((a, b) => a.priority - b.priority);
case 'high':
// 중요 및 높은 우선순위 균형
return tasks
.filter(t => t.priority <= Priority.MEDIUM)
.sort((a, b) => {
// 일부 중간 우선순위와 높은 우선순위 인터리브
if (a.priority === b.priority) return 0;
if (a.priority === Priority.CRITICAL) return -1;
if (b.priority === Priority.CRITICAL) return 1;
// 높음과 중간을 3:1 비율로 혼합
return Math.random() > 0.25 ?
a.priority - b.priority :
b.priority - a.priority;
});
case 'normal':
// 표준 우선순위 정렬
return tasks.sort((a, b) => a.priority - b.priority);
case 'low':
// 모든 작업 처리, 백그라운드 작업 증진
return tasks.sort((a, b) => {
// 백그라운드 작업에 기회 제공
if (a.priority === Priority.BACKGROUND &&
b.priority === Priority.LOW) {
return Math.random() > 0.7 ? -1 : 1;
}
return a.priority - b.priority;
});
}
}
private async storeAdjustment(
taskId: string,
originalPriority: Priority,
adjustedPriority: Priority
): Promise<void> {
const adjustment = {
taskId,
originalPriority: Priority[originalPriority],
adjustedPriority: Priority[adjustedPriority],
timestamp: new Date().toISOString(),
reason: this.determineAdjustmentReason(originalPriority, adjustedPriority)
};
await kv.lpush('priority-adjustments', adjustment);
await kv.ltrim('priority-adjustments', 0, 999); // 마지막 1000개 유지
}
private determineAdjustmentReason(
original: Priority,
adjusted: Priority
): string {
if (adjusted < original) {
return 'priority_increased';
} else if (adjusted > original) {
return 'priority_decreased';
}
return 'no_change';
}
}
노화 방지, 실패 기반 부스팅 및 부하 인식 처리량 최적화를 통한 동적 우선순위 조정을 구현합니다.
4. 스트리밍 우선순위 업데이트
// app/api/priority/stream/route.ts
import { DynamicPriorityAdjuster } from '@/lib/priority/dynamic-adjuster';
import { PriorityRouter } from '@/lib/agents/priority-router';
import { Task } from '@/types/priority';
export const runtime = 'nodejs';
export const maxDuration = 300;
export async function GET(req: Request) {
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const adjuster = new DynamicPriorityAdjuster();
const router = new PriorityRouter();
const workflow = router.createRouterWorkflow();
// 매초마다 우선순위 업데이트 스트리밍
const interval = setInterval(async () => {
try {
const systemLoad = await adjuster.calculateSystemLoad();
const update = {
timestamp: new Date().toISOString(),
load: systemLoad.load,
metrics: systemLoad.metrics,
recommendation: getRecommendation(systemLoad.load)
};
await writer.write(
encoder.encode(`data: ${JSON.stringify(update)}\n\n`)
);
} catch (error) {
console.error('스트림 오류:', error);
}
}, 1000);
// 클라이언트 연결 해제 시 정리
req.signal.addEventListener('abort', () => {
clearInterval(interval);
writer.close();
});
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
function getRecommendation(load: string): string {
switch (load) {
case 'critical':
return '시스템 과부하. 에이전트 스케일링 또는 낮은 우선순위 작업 연기를 고려하세요.';
case 'high':
return '높은 부하 감지. 면밀히 모니터링하고 스케일 준비를 하세요.';
case 'normal':
return '시스템이 정상적으로 작동 중입니다.';
case 'low':
return '낮은 활용도. 유지 보수 또는 백그라운드 작업에 좋은 시간입니다.';
default:
return '알 수 없는 시스템 상태.';
}
}
SSE 스트리밍으로 실시간 우선순위 시스템 모니터링을 제공하여 대시보드가 실시간 시스템 부하와 권장 사항을 표시할 수 있게 합니다.
5. 프론트엔드 우선순위 대시보드
// components/PriorityDashboard.tsx
'use client';
import { useQuery, useMutation } from '@tanstack/react-query';
import { useState, useEffect } from 'react';
import { Task, Priority } from '@/types/priority';
import { groupBy, sortBy } from 'es-toolkit';
export default function PriorityDashboard() {
const [tasks, setTasks] = useState<Task[]>([]);
const [systemLoad, setSystemLoad] = useState<any>(null);
const [newTask, setNewTask] = useState('');
// 새 작업 제출
const submitTask = useMutation({
mutationFn: async (description: string) => {
const res = await fetch('/api/priority', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ task: description }),
});
return res.json();
},
onSuccess: (data) => {
if (data.taskAdded) {
setTasks(prev => [...prev, data.taskAdded]);
}
setNewTask('');
},
});
// 시스템 업데이트 스트림
useEffect(() => {
const eventSource = new EventSource('/api/priority/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setSystemLoad(data);
};
return () => eventSource.close();
}, []);
// 우선순위별 작업 그룹화
const groupedTasks = groupBy(tasks, t => Priority[t.priority]);
return (
<div className="p-6 max-w-6xl mx-auto">
<div className="mb-8">
<h1 className="text-3xl font-bold mb-4">우선순위 관리 시스템</h1>
{/* 시스템 부하 표시기 */}
{systemLoad && (
<div className={`alert ${
systemLoad.load === 'critical' ? 'alert-error' :
systemLoad.load === 'high' ? 'alert-warning' :
'alert-info'
} mb-4`}>
<div>
<h3 className="font-bold">시스템 부하: {systemLoad.load.toUpperCase()}</h3>
<p className="text-sm">{systemLoad.recommendation}</p>
<div className="stats stats-horizontal mt-2 bg-transparent">
<div className="stat px-2">
<div className="stat-title text-xs">중요</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.criticalTasks || 0}
</div>
</div>
<div className="stat px-2">
<div className="stat-title text-xs">높음</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.highTasks || 0}
</div>
</div>
<div className="stat px-2">
<div className="stat-title text-xs">총계</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.totalTasks || 0}
</div>
</div>
</div>
</div>
</div>
)}
{/* 작업 입력 */}
<div className="form-control mb-6">
<div className="input-group">
<input
type="text"
placeholder="새 작업을 설명하세요..."
className="input input-bordered flex-1"
value={newTask}
onChange={(e) => setNewTask(e.target.value)}
onKeyPress={(e) => {
if (e.key === 'Enter' && newTask.trim()) {
submitTask.mutate(newTask);
}
}}
/>
<button
className="btn btn-primary"
onClick={() => newTask.trim() && submitTask.mutate(newTask)}
disabled={submitTask.isPending || !newTask.trim()}
>
{submitTask.isPending ? (
<span className="loading loading-spinner"></span>
) : (
'작업 추가'
)}
</button>
</div>
</div>
{/* 우선순위 레인 */}
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-4">
{Object.entries(Priority)
.filter(([key]) => isNaN(Number(key)))
.map(([priorityName, priorityValue]) => (
<div key={priorityName} className="card bg-base-100 shadow-xl">
<div className="card-body p-4">
<h2 className={`card-title text-sm ${
priorityName === 'CRITICAL' ? 'text-error' :
priorityName === 'HIGH' ? 'text-warning' :
priorityName === 'MEDIUM' ? 'text-info' :
'text-base-content'
}`}>
{priorityName}
<div className="badge badge-sm">
{groupedTasks[priorityName]?.length || 0}
</div>
</h2>
<div className="space-y-2 mt-2">
{(groupedTasks[priorityName] || [])
.slice(0, 5)
.map(task => (
<div
key={task.id}
className="text-xs p-2 bg-base-200 rounded"
>
<div className="font-medium truncate">
{task.description}
</div>
{task.deadline && (
<div className="text-xs opacity-70 mt-1">
마감: {new Date(task.deadline).toLocaleDateString()}
</div>
)}
</div>
))}
{groupedTasks[priorityName]?.length > 5 && (
<div className="text-xs opacity-50 text-center">
+{groupedTasks[priorityName].length - 5} 더보기
</div>
)}
</div>
</div>
</div>
))}
</div>
</div>
</div>
);
}
실시간 우선순위 레인, 시스템 부하 표시기 및 시각적 피드백이 있는 작업 제출을 보여주는 대화형 대시보드를 생성합니다.
결론
에이전트 설계 패턴의 우선순위 지정은 현실 세계 제약에 적응하는 지능형 작업 관리를 도입하여 AI 시스템이 복잡한 워크로드를 처리하는 방식을 변혁합니다. 힙 기반 우선순위 큐, LLM 기반 우선순위 할당 및 동적 조정 알고리즘의 조합을 통해, 다중 기준 정렬을 갖춘 효율적인 O(log n) 작업을 제공하는 기본 구현과 가중치 점수, 다중 에이전트 라우팅 및 실시간 부하 모니터링을 포함한 프로덕션 준비 기능을 보여주는 고급 예제를 만들었습니다.
이러한 패턴은 상태 비저장 실행과 빠른 확장이 필수적인 Vercel과 같은 서버리스 환경에서 탁월합니다. es-toolkit의 최적화된 유틸리티와 함께 LangGraph의 상태 저장 워크플로우를 활용하여, 시스템은 높은 부하에서도 성능을 유지하면서 변화하는 비즈니스 조건에 따라 우선순위를 조정할 수 있는 유연성을 제공합니다. 핵심 통찰은 효과적인 우선순위 지정이 단순히 작업을 정렬하는 것이 아니라 긴급성, 영향력 및 리소스 가용성의 균형을 맞춰 전체 시스템 효율성을 최대화하는 적응형 시스템을 만드는 것이라는 점입니다.