ドラフト エージェント設計パターン - 優先順位付け
TypeScriptでLangChainとLangGraphを使用して、Vercelのサーバーレスプラットフォームに最適化されたインテリジェントなタスク優先順位付けシステムの構築方法を学びます。動的な優先度キュー、重み付けスコアリングアルゴリズム、本番AIエージェント向けの適応的タスク管理の実装を解説します。
メンタルモデル:救急外来のトリアージシステム
エージェントの優先順位付けを救急外来のトリアージシステムのように考えてみてください。医療スタッフが重症度、緊急度、利用可能なリソースに基づいて来院患者を評価するのと同じように、AIエージェントは複数の基準を使用してタスクを評価する必要があります。病院では、胸痛を訴える患者が軽い切り傷の患者よりも優先的に治療を受けるのと同様に、エージェントは重要な顧客の問題を日常的な問い合わせよりも優先します。トリアージ看護師(優先順位付けアルゴリズム)は新しい患者が到着するたびに継続的に再評価を行い(動的な再優先順位付け)、専門チームが異なる重症度レベルを処理します(優先度ベースのルーティング)。このメンタルモデルは、単純なFIFOキューが複雑なシナリオで失敗する理由と、インテリジェントな優先順位付けがシステムの効率を劇的に改善する方法を理解するのに役立ちます。
基本例:LangGraphを使用したタスク優先度キュー
1. 優先度の状態と型の定義
// types/priority.ts
import { z } from 'zod';
import { BaseMessage } from '@langchain/core/messages';
// 型安全性のためのenumを使用した優先度レベル
export enum Priority {
CRITICAL = 0, // 最高優先度
HIGH = 1,
MEDIUM = 2,
LOW = 3,
BACKGROUND = 4 // 最低優先度
}
// Zodバリデーションを使用したタスクスキーマ
export const TaskSchema = z.object({
id: z.string(),
description: z.string(),
priority: z.nativeEnum(Priority),
deadline: z.date().optional(),
dependencies: z.array(z.string()).default([]),
assignee: z.string().optional(),
createdAt: z.date(),
metadata: z.record(z.any()).default({})
});
export type Task = z.infer<typeof TaskSchema>;
// LangGraphワークフローの状態
export interface PriorityState {
tasks: Task[];
currentTask: Task | null;
completedTasks: string[];
messages: BaseMessage[];
metadata: Record<string, any>;
}
Zodバリデーションを使用した強く型付けされた優先度レベルとタスク構造を定義し、優先順位付けシステム全体での型安全性を確保します。
2. ヒープを使用した優先度キューの実装
// lib/priority-queue.ts
import { sortBy, remove } from 'es-toolkit';
import { Task, Priority } from '@/types/priority';
export class PriorityQueue {
private tasks: Task[] = [];
private maxSize: number = 10000;
// ヒープ特性を維持しながらタスクを追加
enqueue(task: Task): void {
if (this.tasks.length >= this.maxSize) {
// 優先度の低い古いタスクを削除
const oldTasks = this.tasks.filter(t =>
t.priority === Priority.BACKGROUND &&
Date.now() - t.createdAt.getTime() > 3600000 // 1時間
);
if (oldTasks.length > 0) {
remove(this.tasks, t => t.id === oldTasks[0].id);
}
}
this.tasks.push(task);
this.heapifyUp();
}
// 最高優先度のタスクを抽出
dequeue(): Task | null {
if (this.tasks.length === 0) return null;
const task = this.tasks[0];
const last = this.tasks.pop();
if (this.tasks.length > 0 && last) {
this.tasks[0] = last;
this.heapifyDown();
}
return task;
}
private heapifyUp(): void {
let index = this.tasks.length - 1;
while (index > 0) {
const parentIndex = Math.floor((index - 1) / 2);
if (this.compare(this.tasks[index], this.tasks[parentIndex]) >= 0) {
break;
}
[this.tasks[index], this.tasks[parentIndex]] =
[this.tasks[parentIndex], this.tasks[index]];
index = parentIndex;
}
}
private heapifyDown(): void {
let index = 0;
while (2 * index + 1 < this.tasks.length) {
const leftChild = 2 * index + 1;
const rightChild = 2 * index + 2;
let smallest = index;
if (this.compare(this.tasks[leftChild], this.tasks[smallest]) < 0) {
smallest = leftChild;
}
if (rightChild < this.tasks.length &&
this.compare(this.tasks[rightChild], this.tasks[smallest]) < 0) {
smallest = rightChild;
}
if (smallest === index) break;
[this.tasks[index], this.tasks[smallest]] =
[this.tasks[smallest], this.tasks[index]];
index = smallest;
}
}
// 優先順位付けのためのタスク比較
private compare(a: Task, b: Task): number {
// 最初に優先度レベルで比較
if (a.priority !== b.priority) {
return a.priority - b.priority;
}
// 次に期限で比較(存在する場合)
if (a.deadline && b.deadline) {
return a.deadline.getTime() - b.deadline.getTime();
}
if (a.deadline) return -1;
if (b.deadline) return 1;
// 最後に作成時間で比較(同じ優先度の場合はFIFO)
return a.createdAt.getTime() - b.createdAt.getTime();
}
// 優先度順にソートされたすべてのタスクを取得
getTasks(): Task[] {
return sortBy(this.tasks, [
t => t.priority,
t => t.deadline?.getTime() || Infinity,
t => t.createdAt.getTime()
]);
}
}
O(log n)操作の効率的な最小ヒープ優先度キューを実装し、古い低優先度タスクの自動クリーンアップと複数基準での比較を行います。
3. 優先度割り当てエージェントの作成
// lib/agents/priority-agent.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { Priority, Task } from '@/types/priority';
import { chunk } from 'es-toolkit';
export class PriorityAssignmentAgent {
private model: ChatGoogleGenerativeAI;
constructor() {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
maxOutputTokens: 1024,
});
}
async assignPriority(taskDescription: string): Promise<Priority> {
const systemPrompt = `タスクの優先順位付けの専門家として、タスクを分析し優先度レベルを割り当ててください。
優先度レベル:
- CRITICAL (0): システム障害、セキュリティ侵害、データ損失リスク、本番環境ダウン
- HIGH (1): 顧客向けの問題、収益への影響、SLA違反
- MEDIUM (2): 重要な機能、重要でないバグ、標準的なリクエスト
- LOW (3): あると良い機能、小さな改善、ドキュメント
- BACKGROUND (4): クリーンアップタスク、最適化、研究
優先度レベル名のみで応答してください。`;
const response = await this.model.invoke([
new SystemMessage(systemPrompt),
new HumanMessage(`タスク: ${taskDescription}`)
]);
const content = response.content.toString().trim().toUpperCase();
// 応答を優先度enumにマッピング
switch (content) {
case 'CRITICAL': return Priority.CRITICAL;
case 'HIGH': return Priority.HIGH;
case 'MEDIUM': return Priority.MEDIUM;
case 'LOW': return Priority.LOW;
case 'BACKGROUND': return Priority.BACKGROUND;
default: return Priority.MEDIUM; // 安全なデフォルト
}
}
async batchAssignPriorities(tasks: string[]): Promise<Priority[]> {
// レート制限を避けるためにチャンクで処理
const chunks = chunk(tasks, 5);
const results: Priority[] = [];
for (const batch of chunks) {
const promises = batch.map(task => this.assignPriority(task));
const priorities = await Promise.all(promises);
results.push(...priorities);
}
return results;
}
}
Gemini Flashを使用して、タスクの説明をインテリジェントに分析し、ビジネスインパクトと緊急度に基づいて適切な優先度レベルを割り当てます。
4. LangGraphワークフローの構築
// lib/workflows/priority-workflow.ts
import { StateGraph, END } from '@langchain/langgraph';
import { PriorityState, Task, Priority } from '@/types/priority';
import { PriorityQueue } from '@/lib/priority-queue';
import { PriorityAssignmentAgent } from '@/lib/agents/priority-agent';
import { v4 as uuidv4 } from 'uuid';
export function createPriorityWorkflow() {
const queue = new PriorityQueue();
const agent = new PriorityAssignmentAgent();
const workflow = new StateGraph<PriorityState>({
channels: {
tasks: {
value: (x: Task[], y: Task[]) => [...x, ...y],
default: () => [],
},
currentTask: {
value: (x: Task | null, y: Task | null) => y || x,
default: () => null,
},
completedTasks: {
value: (x: string[], y: string[]) => [...x, ...y],
default: () => [],
},
messages: {
value: (x, y) => [...x, ...y],
default: () => [],
},
metadata: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
},
});
// ノード:新しいタスクを受信して優先順位付け
workflow.addNode('receive', async (state) => {
const { messages } = state;
const lastMessage = messages[messages.length - 1];
if (!lastMessage?.content) {
return { metadata: { error: 'タスクの説明が提供されていません' } };
}
const taskDescription = lastMessage.content.toString();
const priority = await agent.assignPriority(taskDescription);
const newTask: Task = {
id: uuidv4(),
description: taskDescription,
priority,
createdAt: new Date(),
dependencies: [],
metadata: {}
};
queue.enqueue(newTask);
return {
tasks: [newTask],
metadata: {
lastAdded: newTask.id,
queueSize: queue.getTasks().length
}
};
});
// ノード:キューから次のタスクを選択
workflow.addNode('select', async (state) => {
const nextTask = queue.dequeue();
if (!nextTask) {
return {
currentTask: null,
metadata: { status: 'queue_empty' }
};
}
return {
currentTask: nextTask,
metadata: {
status: 'task_selected',
selectedId: nextTask.id,
selectedPriority: Priority[nextTask.priority]
}
};
});
// ノード:現在のタスクを処理
workflow.addNode('process', async (state) => {
const { currentTask } = state;
if (!currentTask) {
return { metadata: { error: '処理するタスクがありません' } };
}
// タスク処理のシミュレーション
await new Promise(resolve => setTimeout(resolve, 100));
return {
completedTasks: [currentTask.id],
currentTask: null,
metadata: {
status: 'task_completed',
completedId: currentTask.id
}
};
});
// エッジの定義
workflow.addEdge('receive', 'select');
workflow.addEdge('select', 'process');
workflow.addEdge('process', END);
workflow.setEntryPoint('receive');
return workflow.compile();
}
タスクを受信し、優先度を割り当て、キューの選択を管理し、優先順位でタスクを処理するステートフルなワークフローを作成します。
5. 優先度キューのAPIルート
// app/api/priority/route.ts
import { createPriorityWorkflow } from '@/lib/workflows/priority-workflow';
import { HumanMessage } from '@langchain/core/messages';
import { NextResponse } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 60;
export async function POST(req: Request) {
try {
const { task } = await req.json();
if (!task) {
return NextResponse.json(
{ error: 'タスクの説明が必要です' },
{ status: 400 }
);
}
const workflow = createPriorityWorkflow();
const result = await workflow.invoke({
tasks: [],
currentTask: null,
completedTasks: [],
messages: [new HumanMessage(task)],
metadata: {}
});
return NextResponse.json({
taskAdded: result.tasks[0],
queueSize: result.metadata.queueSize,
status: result.metadata.status
});
} catch (error) {
console.error('優先度ワークフローエラー:', error);
return NextResponse.json(
{ error: 'タスクの処理に失敗しました' },
{ status: 500 }
);
}
}
タスクの説明を受け取り、優先度の割り当てとキューステータスを返すREST APIエンドポイントとして優先度ワークフローを公開します。
高度な例:動的マルチエージェント優先度システム
1. 複数の基準を使用した重み付けスコアリング
// lib/scoring/weighted-scorer.ts
import { Task } from '@/types/priority';
import { sum, map, zip } from 'es-toolkit';
interface ScoringCriteria {
urgency: number; // 0-10スケール
impact: number; // 0-10スケール
effort: number; // 0-10スケール(逆)
confidence: number; // 0-10スケール
customerValue: number; // 0-10スケール
}
interface ScoringWeights {
urgency: number;
impact: number;
effort: number;
confidence: number;
customerValue: number;
}
export class WeightedPriorityScorer {
private weights: ScoringWeights;
constructor(weights?: Partial<ScoringWeights>) {
// デフォルトの重みは合計1.0
this.weights = {
urgency: weights?.urgency ?? 0.3,
impact: weights?.impact ?? 0.25,
effort: weights?.effort ?? 0.15,
confidence: weights?.confidence ?? 0.15,
customerValue: weights?.customerValue ?? 0.15,
};
}
// 重み付けされた優先度スコアを計算
calculateScore(criteria: ScoringCriteria): number {
const scores = [
criteria.urgency * this.weights.urgency,
criteria.impact * this.weights.impact,
(10 - criteria.effort) * this.weights.effort, // エフォートを反転
criteria.confidence * this.weights.confidence,
criteria.customerValue * this.weights.customerValue,
];
return sum(scores);
}
// LLMを使用してタスクからスコアリング基準を抽出
async extractCriteria(
task: Task,
model: ChatGoogleGenerativeAI
): Promise<ScoringCriteria> {
const prompt = `このタスクを分析し、各基準を0-10で評価してください:
タスク: ${task.description}
基準:
- 緊急度: どのくらい時間が重要か?(10 = 即座、0 = 待てる)
- 影響: ビジネス/ユーザーへの影響?(10 = 重大、0 = 最小)
- エフォート: 実装の複雑さ?(10 = 非常に複雑、0 = 簡単)
- 信頼度: 要件がどれだけ明確か?(10 = 非常に明確、0 = 曖昧)
- 顧客価値: 直接的な顧客の利益?(10 = 高価値、0 = 内部のみ)
JSON形式で応答してください: {"urgency": X, "impact": Y, ...}`;
const response = await model.invoke([new HumanMessage(prompt)]);
const content = response.content.toString();
try {
const parsed = JSON.parse(content);
return {
urgency: parsed.urgency || 5,
impact: parsed.impact || 5,
effort: parsed.effort || 5,
confidence: parsed.confidence || 5,
customerValue: parsed.customerValue || 5,
};
} catch {
// パースエラーの場合はデフォルトで中間スコア
return {
urgency: 5,
impact: 5,
effort: 5,
confidence: 5,
customerValue: 5,
};
}
}
// コンテキストに基づいて重みを動的に調整
adjustWeights(context: {
isEmergency?: boolean;
isCustomerFacing?: boolean;
resourcesLimited?: boolean;
}): void {
if (context.isEmergency) {
this.weights.urgency = 0.5;
this.weights.impact = 0.3;
this.normalizeWeights();
}
if (context.isCustomerFacing) {
this.weights.customerValue = 0.3;
this.normalizeWeights();
}
if (context.resourcesLimited) {
this.weights.effort = 0.25;
this.normalizeWeights();
}
}
private normalizeWeights(): void {
const total = sum(Object.values(this.weights));
for (const key in this.weights) {
this.weights[key as keyof ScoringWeights] /= total;
}
}
}
ビジネスコンテキストに基づく動的な重み調整とLLMベースの基準抽出を備えたRICEスタイルの重み付けスコアリングを実装します。
2. マルチエージェント優先度ルーター
// lib/agents/priority-router.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { Priority, Task, PriorityState } from '@/types/priority';
import { groupBy, maxBy } from 'es-toolkit';
interface RouterState extends PriorityState {
routedTasks: Record<string, Task[]>;
agentAssignments: Record<string, string>;
}
export class PriorityRouter {
private criticalAgent: ChatGoogleGenerativeAI;
private standardAgent: ChatGoogleGenerativeAI;
private backgroundAgent: ChatGoogleGenerativeAI;
constructor() {
// 異なる優先度レベルに異なるモデル
this.criticalAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0,
maxOutputTokens: 4096,
});
this.standardAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
maxOutputTokens: 2048,
});
this.backgroundAgent = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.5,
maxOutputTokens: 1024,
});
}
createRouterWorkflow() {
const workflow = new StateGraph<RouterState>({
channels: {
tasks: {
value: (x, y) => [...x, ...y],
default: () => [],
},
routedTasks: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
agentAssignments: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
currentTask: {
value: (x, y) => y || x,
default: () => null,
},
completedTasks: {
value: (x, y) => [...x, ...y],
default: () => [],
},
messages: {
value: (x, y) => [...x, ...y],
default: () => [],
},
metadata: {
value: (x, y) => ({...x, ...y}),
default: () => ({}),
},
},
});
// タスクを適切なキューにルーティング
workflow.addNode('route', async (state) => {
const { tasks } = state;
// 優先度別にタスクをグループ化
const grouped = groupBy(tasks, t => Priority[t.priority]);
const routed: Record<string, Task[]> = {
critical: grouped['CRITICAL'] || [],
high: grouped['HIGH'] || [],
standard: [...(grouped['MEDIUM'] || []), ...(grouped['LOW'] || [])],
background: grouped['BACKGROUND'] || [],
};
// 優先度に基づいてエージェントを割り当て
const assignments: Record<string, string> = {};
for (const task of tasks) {
if (task.priority === Priority.CRITICAL) {
assignments[task.id] = 'critical-agent';
} else if (task.priority === Priority.HIGH) {
assignments[task.id] = 'high-priority-agent';
} else if (task.priority <= Priority.LOW) {
assignments[task.id] = 'standard-agent';
} else {
assignments[task.id] = 'background-agent';
}
}
return {
routedTasks: routed,
agentAssignments: assignments,
metadata: {
criticalCount: routed.critical.length,
highCount: routed.high.length,
standardCount: routed.standard.length,
backgroundCount: routed.background.length,
}
};
});
// 専用リソースでクリティカルタスクを処理
workflow.addNode('process-critical', async (state) => {
const { routedTasks } = state;
const criticalTasks = routedTasks.critical || [];
if (criticalTasks.length === 0) {
return { metadata: { criticalStatus: 'none' } };
}
// 高性能モデルで処理
const results = await Promise.all(
criticalTasks.map(async task => {
const response = await this.criticalAgent.invoke([
new HumanMessage(`クリティカルタスク: ${task.description}`)
]);
return {
taskId: task.id,
result: response.content,
processingTime: Date.now()
};
})
);
return {
completedTasks: criticalTasks.map(t => t.id),
metadata: {
criticalProcessed: results.length,
criticalResults: results
}
};
});
// 標準タスクをバッチで処理
workflow.addNode('process-standard', async (state) => {
const { routedTasks } = state;
const standardTasks = routedTasks.standard || [];
if (standardTasks.length === 0) {
return { metadata: { standardStatus: 'none' } };
}
// 効率的にバッチ処理
const batchSize = 5;
const completed: string[] = [];
for (let i = 0; i < standardTasks.length; i += batchSize) {
const batch = standardTasks.slice(i, i + batchSize);
const batchPromises = batch.map(task =>
this.standardAgent.invoke([
new HumanMessage(`タスク: ${task.description}`)
])
);
await Promise.all(batchPromises);
completed.push(...batch.map(t => t.id));
}
return {
completedTasks: completed,
metadata: { standardProcessed: completed.length }
};
});
// バックグラウンドタスクをスケジュール
workflow.addNode('schedule-background', async (state) => {
const { routedTasks } = state;
const backgroundTasks = routedTasks.background || [];
if (backgroundTasks.length === 0) {
return { metadata: { backgroundStatus: 'none' } };
}
// オフピーク処理のためにキューに入れる
const scheduled = backgroundTasks.map(task => ({
taskId: task.id,
scheduledFor: new Date(Date.now() + 3600000), // 1時間後
agent: 'background-agent'
}));
return {
metadata: {
backgroundScheduled: scheduled.length,
scheduledTasks: scheduled
}
};
});
// タスクの存在に基づく条件付きルーティング
workflow.addConditionalEdges('route', (state) => {
const { routedTasks } = state;
const edges = [];
if (routedTasks.critical?.length > 0) edges.push('process-critical');
if (routedTasks.standard?.length > 0) edges.push('process-standard');
if (routedTasks.background?.length > 0) edges.push('schedule-background');
return edges.length > 0 ? edges : [END];
});
workflow.addEdge('process-critical', END);
workflow.addEdge('process-standard', END);
workflow.addEdge('schedule-background', END);
workflow.setEntryPoint('route');
return workflow.compile();
}
}
各優先度レベルに対して異なるモデルと処理戦略を持つ洗練されたマルチエージェントルーティングを実装します。
3. 動的優先度調整システム
// lib/priority/dynamic-adjuster.ts
import { Task, Priority } from '@/types/priority';
import { mean, standardDeviation } from 'es-toolkit';
import { kv } from '@vercel/kv';
interface TaskMetrics {
taskId: string;
attempts: number;
failureRate: number;
averageProcessingTime: number;
lastAttempt: Date;
ageInHours: number;
}
export class DynamicPriorityAdjuster {
private readonly AGING_FACTOR = 0.1; // 1時間あたりの優先度ブースト
private readonly FAILURE_THRESHOLD = 0.3; // 30%の失敗率でブーストをトリガー
private readonly STARVATION_HOURS = 4; // 反飢餓状態が開始するまでの時間
async adjustPriority(task: Task, metrics: TaskMetrics): Promise<Priority> {
let priorityScore = task.priority;
// 年齢ベースの調整(飢餓防止)
if (metrics.ageInHours > this.STARVATION_HOURS) {
const ageBoost = Math.floor(
(metrics.ageInHours - this.STARVATION_HOURS) * this.AGING_FACTOR
);
priorityScore = Math.max(0, priorityScore - ageBoost);
}
// 失敗率の調整
if (metrics.failureRate > this.FAILURE_THRESHOLD) {
priorityScore = Math.max(0, priorityScore - 1);
}
// 期限の近接性の調整
if (task.deadline) {
const hoursUntilDeadline =
(task.deadline.getTime() - Date.now()) / 3600000;
if (hoursUntilDeadline < 24) {
priorityScore = Priority.HIGH;
}
if (hoursUntilDeadline < 4) {
priorityScore = Priority.CRITICAL;
}
}
// 調整履歴を保存
await this.storeAdjustment(task.id, task.priority, priorityScore);
return priorityScore as Priority;
}
async calculateSystemLoad(): Promise<{
load: 'low' | 'normal' | 'high' | 'critical';
metrics: Record<string, number>;
}> {
// KVストアから最近のタスクメトリクスを取得
const recentTasks = await kv.lrange('task-queue', 0, 100);
if (!recentTasks || recentTasks.length === 0) {
return { load: 'low', metrics: {} };
}
const priorities = recentTasks.map((t: any) => t.priority);
const criticalCount = priorities.filter(p => p === Priority.CRITICAL).length;
const highCount = priorities.filter(p => p === Priority.HIGH).length;
const avgPriority = mean(priorities);
const stdDev = standardDeviation(priorities);
// システムロードを決定
let load: 'low' | 'normal' | 'high' | 'critical';
if (criticalCount > 10 || avgPriority < 1) {
load = 'critical';
} else if (highCount > 20 || avgPriority < 2) {
load = 'high';
} else if (avgPriority < 3) {
load = 'normal';
} else {
load = 'low';
}
return {
load,
metrics: {
averagePriority: avgPriority,
standardDeviation: stdDev,
criticalTasks: criticalCount,
highTasks: highCount,
totalTasks: recentTasks.length,
}
};
}
async optimizeThroughput(tasks: Task[]): Promise<Task[]> {
const systemLoad = await this.calculateSystemLoad();
// 負荷に基づいて異なる戦略を適用
switch (systemLoad.load) {
case 'critical':
// クリティカルタスクのみに焦点を当てる
return tasks
.filter(t => t.priority <= Priority.HIGH)
.sort((a, b) => a.priority - b.priority);
case 'high':
// クリティカルと高優先度のバランス
return tasks
.filter(t => t.priority <= Priority.MEDIUM)
.sort((a, b) => {
// いくつかの中優先度で高優先度をインターリーブ
if (a.priority === b.priority) return 0;
if (a.priority === Priority.CRITICAL) return -1;
if (b.priority === Priority.CRITICAL) return 1;
// 高と中を3:1の比率で混合
return Math.random() > 0.25 ?
a.priority - b.priority :
b.priority - a.priority;
});
case 'normal':
// 標準的な優先順位付け
return tasks.sort((a, b) => a.priority - b.priority);
case 'low':
// すべてのタスクを処理、バックグラウンド作業を強化
return tasks.sort((a, b) => {
// バックグラウンドタスクにチャンスを与える
if (a.priority === Priority.BACKGROUND &&
b.priority === Priority.LOW) {
return Math.random() > 0.7 ? -1 : 1;
}
return a.priority - b.priority;
});
}
}
private async storeAdjustment(
taskId: string,
originalPriority: Priority,
adjustedPriority: Priority
): Promise<void> {
const adjustment = {
taskId,
originalPriority: Priority[originalPriority],
adjustedPriority: Priority[adjustedPriority],
timestamp: new Date().toISOString(),
reason: this.determineAdjustmentReason(originalPriority, adjustedPriority)
};
await kv.lpush('priority-adjustments', adjustment);
await kv.ltrim('priority-adjustments', 0, 999); // 最後の1000件を保持
}
private determineAdjustmentReason(
original: Priority,
adjusted: Priority
): string {
if (adjusted < original) {
return 'priority_increased';
} else if (adjusted > original) {
return 'priority_decreased';
}
return 'no_change';
}
}
エージング防止、失敗ベースのブースティング、負荷認識のスループット最適化を備えた動的優先度調整を実装します。
4. ストリーミング優先度更新
// app/api/priority/stream/route.ts
import { DynamicPriorityAdjuster } from '@/lib/priority/dynamic-adjuster';
import { PriorityRouter } from '@/lib/agents/priority-router';
import { Task } from '@/types/priority';
export const runtime = 'nodejs';
export const maxDuration = 300;
export async function GET(req: Request) {
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const adjuster = new DynamicPriorityAdjuster();
const router = new PriorityRouter();
const workflow = router.createRouterWorkflow();
// 毎秒優先度の更新をストリーム
const interval = setInterval(async () => {
try {
const systemLoad = await adjuster.calculateSystemLoad();
const update = {
timestamp: new Date().toISOString(),
load: systemLoad.load,
metrics: systemLoad.metrics,
recommendation: getRecommendation(systemLoad.load)
};
await writer.write(
encoder.encode(`data: ${JSON.stringify(update)}\n\n`)
);
} catch (error) {
console.error('ストリームエラー:', error);
}
}, 1000);
// クライアント切断時のクリーンアップ
req.signal.addEventListener('abort', () => {
clearInterval(interval);
writer.close();
});
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
function getRecommendation(load: string): string {
switch (load) {
case 'critical':
return 'システム過負荷。エージェントのスケーリングまたは低優先度タスクの延期を検討してください。';
case 'high':
return '高負荷検出。注意深く監視し、スケールの準備をしてください。';
case 'normal':
return 'システムは正常に動作しています。';
case 'low':
return '低利用率。メンテナンスやバックグラウンドタスクに最適な時間です。';
default:
return '不明なシステム状態。';
}
}
SSEストリーミングによるリアルタイム優先度システム監視を提供し、ダッシュボードでライブシステムロードと推奨事項を表示できるようにします。
5. フロントエンド優先度ダッシュボード
// components/PriorityDashboard.tsx
'use client';
import { useQuery, useMutation } from '@tanstack/react-query';
import { useState, useEffect } from 'react';
import { Task, Priority } from '@/types/priority';
import { groupBy, sortBy } from 'es-toolkit';
export default function PriorityDashboard() {
const [tasks, setTasks] = useState<Task[]>([]);
const [systemLoad, setSystemLoad] = useState<any>(null);
const [newTask, setNewTask] = useState('');
// 新しいタスクを送信
const submitTask = useMutation({
mutationFn: async (description: string) => {
const res = await fetch('/api/priority', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ task: description }),
});
return res.json();
},
onSuccess: (data) => {
if (data.taskAdded) {
setTasks(prev => [...prev, data.taskAdded]);
}
setNewTask('');
},
});
// システム更新をストリーム
useEffect(() => {
const eventSource = new EventSource('/api/priority/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setSystemLoad(data);
};
return () => eventSource.close();
}, []);
// 優先度別にタスクをグループ化
const groupedTasks = groupBy(tasks, t => Priority[t.priority]);
return (
<div className="p-6 max-w-6xl mx-auto">
<div className="mb-8">
<h1 className="text-3xl font-bold mb-4">優先度管理システム</h1>
{/* システムロードインジケーター */}
{systemLoad && (
<div className={`alert ${
systemLoad.load === 'critical' ? 'alert-error' :
systemLoad.load === 'high' ? 'alert-warning' :
'alert-info'
} mb-4`}>
<div>
<h3 className="font-bold">システム負荷: {systemLoad.load.toUpperCase()}</h3>
<p className="text-sm">{systemLoad.recommendation}</p>
<div className="stats stats-horizontal mt-2 bg-transparent">
<div className="stat px-2">
<div className="stat-title text-xs">クリティカル</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.criticalTasks || 0}
</div>
</div>
<div className="stat px-2">
<div className="stat-title text-xs">高</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.highTasks || 0}
</div>
</div>
<div className="stat px-2">
<div className="stat-title text-xs">合計</div>
<div className="stat-value text-lg">
{systemLoad.metrics?.totalTasks || 0}
</div>
</div>
</div>
</div>
</div>
)}
{/* タスク入力 */}
<div className="form-control mb-6">
<div className="input-group">
<input
type="text"
placeholder="新しいタスクを説明..."
className="input input-bordered flex-1"
value={newTask}
onChange={(e) => setNewTask(e.target.value)}
onKeyPress={(e) => {
if (e.key === 'Enter' && newTask.trim()) {
submitTask.mutate(newTask);
}
}}
/>
<button
className="btn btn-primary"
onClick={() => newTask.trim() && submitTask.mutate(newTask)}
disabled={submitTask.isPending || !newTask.trim()}
>
{submitTask.isPending ? (
<span className="loading loading-spinner"></span>
) : (
'タスクを追加'
)}
</button>
</div>
</div>
{/* 優先度レーン */}
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-4">
{Object.entries(Priority)
.filter(([key]) => isNaN(Number(key)))
.map(([priorityName, priorityValue]) => (
<div key={priorityName} className="card bg-base-100 shadow-xl">
<div className="card-body p-4">
<h2 className={`card-title text-sm ${
priorityName === 'CRITICAL' ? 'text-error' :
priorityName === 'HIGH' ? 'text-warning' :
priorityName === 'MEDIUM' ? 'text-info' :
'text-base-content'
}`}>
{priorityName}
<div className="badge badge-sm">
{groupedTasks[priorityName]?.length || 0}
</div>
</h2>
<div className="space-y-2 mt-2">
{(groupedTasks[priorityName] || [])
.slice(0, 5)
.map(task => (
<div
key={task.id}
className="text-xs p-2 bg-base-200 rounded"
>
<div className="font-medium truncate">
{task.description}
</div>
{task.deadline && (
<div className="text-xs opacity-70 mt-1">
期限: {new Date(task.deadline).toLocaleDateString()}
</div>
)}
</div>
))}
{groupedTasks[priorityName]?.length > 5 && (
<div className="text-xs opacity-50 text-center">
+{groupedTasks[priorityName].length - 5} 件以上
</div>
)}
</div>
</div>
</div>
))}
</div>
</div>
</div>
);
}
リアルタイムの優先度レーン、システムロードインジケーター、視覚的なフィードバックによるタスク送信を表示するインタラクティブなダッシュボードを作成します。
まとめ
エージェント設計パターンにおける優先順位付けは、現実世界の制約に適応しながら、人間の意思決定を反映しつつマシンスケールで動作するインテリジェントなタスク管理を導入することで、AIシステムが複雑なワークロードを処理する方法を変革します。ヒープベースの優先度キュー、LLMを活用した優先度割り当て、動的調整アルゴリズムの組み合わせにより、複数の基準によるソートを備えた効率的なO(log n)操作を提供する基本実装から、重み付けスコアリング、マルチエージェントルーティング、リアルタイムロード監視などの本番環境対応機能を含む高度な例まで、システムを作成しました。
これらのパターンは、ステートレスな実行と迅速なスケーリングが不可欠なVercelのようなサーバーレス環境で優れています。LangGraphのステートフルワークフローとes-toolkitの最適化されたユーティリティを活用することで、システムは高負荷下でもパフォーマンスを維持しながら、変化するビジネス条件に基づいて優先順位を調整する柔軟性を提供します。重要な洞察は、効果的な優先順位付けが単にタスクを順序付けることだけでなく、緊急度、影響、リソースの可用性のバランスを取って、システム全体の効率を最大化する適応的なシステムを作成することにあるということです。