草案 "智能代理设计模式 - 异常处理"
掌握使用 TypeScript、LangChain、LangGraph 和 Vercel 无服务器平台构建容错 AI 代理的技术,优雅地处理错误、从故障中恢复并在生产环境中保持可靠性。
心智模型:自适应应急响应系统
将代理中的异常处理想象成医院的应急响应系统。就像医院针对不同严重级别有不同的协议(轻伤→护士、中度→医生、危急→完整创伤团队),代理需要分层的错误响应。网络错误就像供应延迟(等待并重试),API 故障就像设备故障(使用备用设备),系统崩溃就像断电(启动应急发电机并通知管理员)。该系统不仅对问题做出反应,还从中学习,调整协议,并维持服务连续性。您的代理同样应该及早检测问题、适当响应、优雅恢复,并从每次事件中改进。
基础示例:带错误边界的健壮代理
1. 定义错误层次结构和恢复策略
// lib/errors/exception-types.ts
import { z } from 'zod';
import { isError, isString } from 'es-toolkit';
export const ErrorLevelSchema = z.enum(['transient', 'recoverable', 'critical']);
export type ErrorLevel = z.infer<typeof ErrorLevelSchema>;
export const RecoveryStrategySchema = z.enum([
'retry',
'fallback',
'cache',
'degrade',
'escalate',
'abort'
]);
export type RecoveryStrategy = z.infer<typeof RecoveryStrategySchema>;
export interface ErrorContext {
timestamp: Date;
attempt: number;
maxAttempts: number;
strategy: RecoveryStrategy;
metadata?: Record<string, any>;
}
export class AgentException extends Error {
constructor(
message: string,
public level: ErrorLevel,
public strategy: RecoveryStrategy,
public context?: ErrorContext
) {
super(message);
this.name = 'AgentException';
}
static fromError(error: unknown, level: ErrorLevel = 'recoverable'): AgentException {
if (error instanceof AgentException) return error;
const message = isError(error) ? error.message :
isString(error) ? error :
'Unknown error occurred';
return new AgentException(message, level, 'retry');
}
}
export class ToolException extends AgentException {
constructor(
public toolName: string,
message: string,
strategy: RecoveryStrategy = 'fallback'
) {
super(`Tool [${toolName}]: ${message}`, 'recoverable', strategy);
this.name = 'ToolException';
}
}
export class ValidationException extends AgentException {
constructor(
message: string,
public validationErrors?: z.ZodIssue[]
) {
super(message, 'transient', 'retry');
this.name = 'ValidationException';
}
}
创建具有明确恢复策略和上下文信息的结构化错误层次结构,用于智能错误处理。
2. 构建错误恢复管理器
// lib/recovery/recovery-manager.ts
import { retry, delay, throttle } from 'es-toolkit';
import {
AgentException,
ErrorLevel,
RecoveryStrategy,
ErrorContext
} from '@/lib/errors/exception-types';
interface RecoveryConfig {
maxRetries: number;
baseDelay: number;
maxDelay: number;
timeout: number;
fallbackHandlers: Map<string, () => Promise<any>>;
}
export class RecoveryManager {
private errorHistory: AgentException[] = [];
private config: RecoveryConfig;
constructor(config: Partial<RecoveryConfig> = {}) {
this.config = {
maxRetries: config.maxRetries ?? 3,
baseDelay: config.baseDelay ?? 1000,
maxDelay: config.maxDelay ?? 30000,
timeout: config.timeout ?? 777000, // Vercel 限制
fallbackHandlers: config.fallbackHandlers ?? new Map()
};
}
async executeWithRecovery<T>(
operation: () => Promise<T>,
operationName: string
): Promise<T> {
const startTime = Date.now();
let lastError: AgentException | null = null;
for (let attempt = 1; attempt <= this.config.maxRetries; attempt++) {
try {
// 检查超时
if (Date.now() - startTime > this.config.timeout) {
throw new AgentException(
'Operation timeout exceeded',
'critical',
'abort'
);
}
// 执行操作
const result = await Promise.race([
operation(),
this.createTimeout(this.config.timeout - (Date.now() - startTime))
]);
// 成功时清除错误历史
if (attempt > 1) {
console.log(`Recovery successful for ${operationName} on attempt ${attempt}`);
}
return result;
} catch (error) {
lastError = AgentException.fromError(error);
lastError.context = {
timestamp: new Date(),
attempt,
maxAttempts: this.config.maxRetries,
strategy: this.determineStrategy(lastError, attempt)
};
this.errorHistory.push(lastError);
// 应用恢复策略
const recovered = await this.applyStrategy(
lastError,
operationName,
attempt
);
if (recovered !== null) {
return recovered;
}
// 计算退避延迟
if (attempt < this.config.maxRetries) {
const delayMs = Math.min(
this.config.baseDelay * Math.pow(2, attempt - 1),
this.config.maxDelay
);
await delay(delayMs);
}
}
}
throw lastError || new AgentException(
`${operationName} failed after ${this.config.maxRetries} attempts`,
'critical',
'escalate'
);
}
private determineStrategy(
error: AgentException,
attempt: number
): RecoveryStrategy {
// 关键错误应立即升级
if (error.level === 'critical') return 'escalate';
// 瞬态错误应首先重试
if (error.level === 'transient' && attempt < this.config.maxRetries) {
return 'retry';
}
// 可恢复错误应尝试回退
if (error.level === 'recoverable') {
return 'fallback';
}
// 默认降级服务
return 'degrade';
}
private async applyStrategy<T>(
error: AgentException,
operationName: string,
attempt: number
): Promise<T | null> {
const strategy = error.context?.strategy || error.strategy;
switch (strategy) {
case 'fallback':
const fallback = this.config.fallbackHandlers.get(operationName);
if (fallback) {
console.log(`Applying fallback for ${operationName}`);
return await fallback();
}
break;
case 'cache':
// 如果可用,返回缓存的结果
console.log(`Would return cached result for ${operationName}`);
break;
case 'degrade':
console.log(`Degrading service for ${operationName}`);
return null;
case 'escalate':
console.error(`Escalating error for ${operationName}:`, error.message);
throw error;
case 'abort':
console.error(`Aborting ${operationName}`);
throw error;
}
return null;
}
private createTimeout(ms: number): Promise<never> {
return new Promise((_, reject) =>
setTimeout(() => reject(new AgentException(
'Operation timeout',
'transient',
'retry'
)), ms)
);
}
getErrorHistory(): AgentException[] {
return [...this.errorHistory];
}
clearHistory(): void {
this.errorHistory = [];
}
}
实现一个复杂的恢复管理器,根据错误类型确定并应用适当的恢复策略。
3. 创建带工具的错误感知代理
// lib/agents/error-aware-agent.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { DynamicStructuredTool } from '@langchain/core/tools';
import { z } from 'zod';
import { RecoveryManager } from '@/lib/recovery/recovery-manager';
import { ToolException, ValidationException } from '@/lib/errors/exception-types';
import { pipe, map, filter, reduce } from 'es-toolkit';
// 处理错误的安全工具包装器
export function createSafeTool(
name: string,
description: string,
schema: z.ZodSchema,
implementation: (input: any) => Promise<any>,
fallback?: () => Promise<any>
) {
return new DynamicStructuredTool({
name,
description,
schema,
func: async (input) => {
const recoveryManager = new RecoveryManager({
maxRetries: 2,
fallbackHandlers: fallback ?
new Map([[name, fallback]]) :
new Map()
});
try {
// 验证输入
const validated = schema.safeParse(input);
if (!validated.success) {
throw new ValidationException(
'Invalid tool input',
validated.error.issues
);
}
// 带恢复执行
return await recoveryManager.executeWithRecovery(
() => implementation(validated.data),
name
);
} catch (error) {
console.error(`Tool ${name} failed:`, error);
throw new ToolException(name,
error instanceof Error ? error.message : 'Unknown error'
);
}
}
});
}
// 带内置错误处理的示例工具
export function createResilientTools() {
const weatherTool = createSafeTool(
'get_weather',
'Get current weather for a location',
z.object({
location: z.string().min(1),
units: z.enum(['celsius', 'fahrenheit']).default('celsius')
}),
async (input) => {
// 模拟偶尔的失败
if (Math.random() < 0.3) {
throw new Error('Weather API unavailable');
}
return {
location: input.location,
temperature: 22,
units: input.units,
conditions: 'Partly cloudy'
};
},
async () => ({
location: 'Unknown',
temperature: 20,
units: 'celsius',
conditions: 'Data unavailable',
source: 'fallback'
})
);
const calculatorTool = createSafeTool(
'calculator',
'Perform mathematical calculations',
z.object({
expression: z.string(),
precision: z.number().int().min(0).max(10).default(2)
}),
async (input) => {
try {
// 使用 Function 构造函数的安全评估
const result = new Function('return ' + input.expression)();
return {
expression: input.expression,
result: Number(result.toFixed(input.precision))
};
} catch {
throw new Error('Invalid mathematical expression');
}
}
);
const searchTool = createSafeTool(
'web_search',
'Search the web for information',
z.object({
query: z.string().min(1).max(200),
maxResults: z.number().int().min(1).max(10).default(5)
}),
async (input) => {
// 模拟可能失败的搜索
if (Math.random() < 0.2) {
throw new Error('Search service timeout');
}
return {
query: input.query,
results: Array.from({ length: input.maxResults }, (_, i) => ({
title: `Result ${i + 1} for "${input.query}"`,
snippet: `Sample content for result ${i + 1}`,
url: `https://example.com/result-${i + 1}`
}))
};
},
async () => ({
query: 'fallback',
results: [],
error: 'Search unavailable, please try again later'
})
);
return [weatherTool, calculatorTool, searchTool];
}
创建具有验证、恢复机制和回退处理器的错误感知工具,以实现可靠执行。
4. 实现带错误边界的代理 API
// app/api/error-aware-agent/route.ts
import { NextResponse } from 'next/server';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { createResilientTools } from '@/lib/agents/error-aware-agent';
import { RecoveryManager } from '@/lib/recovery/recovery-manager';
import { AgentException } from '@/lib/errors/exception-types';
import { createReactAgent } from '@langchain/langgraph/prebuilt';
import { HumanMessage } from '@langchain/core/messages';
export const runtime = 'nodejs';
export const maxDuration = 300;
export async function POST(req: Request) {
const recoveryManager = new RecoveryManager();
try {
const { message } = await req.json();
if (!message || typeof message !== 'string') {
throw new AgentException(
'Invalid request: message is required',
'transient',
'retry'
);
}
// 带恢复执行代理
const result = await recoveryManager.executeWithRecovery(
async () => {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
maxOutputTokens: 2048
});
const tools = createResilientTools();
const agent = createReactAgent({
llm: model,
tools,
messageModifier: `您是一个具有错误恢复能力的有用助手。
如果工具失败,请优雅地承认并尽可能尝试替代方案。
即使工具不可用,也始终提供有用的响应。`
});
const response = await agent.invoke({
messages: [new HumanMessage(message)]
});
return response.messages[response.messages.length - 1].content;
},
'agent-execution'
);
const errorHistory = recoveryManager.getErrorHistory();
return NextResponse.json({
success: true,
result,
recoveryAttempts: errorHistory.length,
errors: errorHistory.map(e => ({
message: e.message,
level: e.level,
strategy: e.strategy,
attempt: e.context?.attempt
}))
});
} catch (error) {
const agentError = AgentException.fromError(error, 'critical');
console.error('Agent execution failed:', agentError);
return NextResponse.json(
{
success: false,
error: agentError.message,
level: agentError.level,
strategy: agentError.strategy,
errorHistory: recoveryManager.getErrorHistory()
},
{ status: agentError.level === 'critical' ? 500 : 503 }
);
}
}
实现具有详细错误跟踪和恢复尝试的全面错误边界的 API 路由。
5. 创建带错误反馈的前端
// components/ErrorAwareChat.tsx
'use client';
import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
import { pipe, groupBy, map as mapUtil } from 'es-toolkit';
interface ChatResponse {
success: boolean;
result?: string;
error?: string;
level?: string;
recoveryAttempts?: number;
errors?: Array<{
message: string;
level: string;
strategy: string;
attempt?: number;
}>;
}
export default function ErrorAwareChat() {
const [message, setMessage] = useState('');
const [chatHistory, setChatHistory] = useState<Array<{
role: 'user' | 'assistant' | 'error';
content: string;
metadata?: any;
}>>([]);
const sendMessage = useMutation<ChatResponse, Error, string>({
mutationFn: async (message: string) => {
const response = await fetch('/api/error-aware-agent', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const data = await response.json();
if (!response.ok && !data.success) {
throw new Error(data.error || 'Request failed');
}
return data;
},
onSuccess: (data) => {
if (data.success) {
setChatHistory(prev => [
...prev,
{ role: 'assistant', content: data.result!, metadata: data }
]);
}
},
onError: (error) => {
setChatHistory(prev => [
...prev,
{
role: 'error',
content: `错误:${error.message}`,
metadata: { timestamp: new Date() }
}
]);
}
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (!message.trim()) return;
setChatHistory(prev => [...prev, { role: 'user', content: message }]);
sendMessage.mutate(message);
setMessage('');
};
const getRecoveryBadge = (attempts?: number) => {
if (!attempts) return null;
const badgeClass = attempts > 2 ? 'badge-warning' : 'badge-info';
return (
<div className={`badge ${badgeClass} badge-sm`}>
{attempts} 次恢复尝试
</div>
);
};
return (
<div className="card w-full bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">错误感知 AI 助手</h2>
{/* 聊天历史 */}
<div className="h-96 overflow-y-auto space-y-2 p-4 bg-base-200 rounded-lg">
{chatHistory.map((msg, idx) => (
<div
key={idx}
className={`chat ${msg.role === 'user' ? 'chat-end' : 'chat-start'}`}
>
<div className="chat-header">
{msg.role === 'user' ? '您' :
msg.role === 'assistant' ? '助手' : '系统'}
</div>
<div className={`chat-bubble ${
msg.role === 'error' ? 'chat-bubble-error' :
msg.role === 'user' ? 'chat-bubble-primary' :
'chat-bubble-secondary'
}`}>
{msg.content}
{msg.metadata?.recoveryAttempts && (
<div className="mt-2">
{getRecoveryBadge(msg.metadata.recoveryAttempts)}
</div>
)}
</div>
{msg.metadata?.errors && msg.metadata.errors.length > 0 && (
<div className="chat-footer opacity-50 text-xs">
恢复自:{msg.metadata.errors[0].strategy}
</div>
)}
</div>
))}
{sendMessage.isPending && (
<div className="chat chat-start">
<div className="chat-bubble chat-bubble-secondary">
<span className="loading loading-dots loading-sm"></span>
</div>
</div>
)}
</div>
{/* 输入表单 */}
<form onSubmit={handleSubmit} className="join w-full mt-4">
<input
type="text"
className="input input-bordered join-item flex-1"
placeholder="问我任何问题..."
value={message}
onChange={(e) => setMessage(e.target.value)}
disabled={sendMessage.isPending}
/>
<button
type="submit"
className="btn btn-primary join-item"
disabled={sendMessage.isPending || !message.trim()}
>
发送
</button>
</form>
{/* 错误状态 */}
{sendMessage.isError && (
<div className="alert alert-error mt-2">
<span>发送消息失败。请重试。</span>
</div>
)}
</div>
</div>
);
}
具有错误恢复尝试的视觉反馈和优雅错误显示的 React 组件。
高级示例:自修正多代理系统
1. 实现代理层次结构中的错误传播
// lib/agents/error-propagation.ts
import { EventEmitter } from 'events';
import { z } from 'zod';
import { throttle, debounce } from 'es-toolkit';
export interface ErrorEvent {
agentId: string;
parentId?: string;
error: Error;
timestamp: Date;
handled: boolean;
propagated: boolean;
}
export class ErrorPropagationManager extends EventEmitter {
private errorChain: Map<string, ErrorEvent[]> = new Map();
private handlers: Map<string, (error: ErrorEvent) => Promise<boolean>> = new Map();
constructor() {
super();
this.setupErrorHandling();
}
private setupErrorHandling() {
// 节流错误发射以防止泛滥
const throttledEmit = throttle((event: ErrorEvent) => {
this.emit('error', event);
}, 1000);
this.on('error', throttledEmit);
}
registerAgent(
agentId: string,
parentId?: string,
errorHandler?: (error: ErrorEvent) => Promise<boolean>
) {
this.errorChain.set(agentId, []);
if (errorHandler) {
this.handlers.set(agentId, errorHandler);
}
if (parentId) {
// 设置错误传播链
this.on(`error:${agentId}`, async (event: ErrorEvent) => {
event.propagated = true;
// 首先尝试本地处理器
const handled = await this.tryHandleError(agentId, event);
if (!handled) {
// 传播到父级
this.emit(`error:${parentId}`, {
...event,
parentId: agentId
});
}
});
}
}
async reportError(agentId: string, error: Error): Promise<boolean> {
const event: ErrorEvent = {
agentId,
error,
timestamp: new Date(),
handled: false,
propagated: false
};
// 存储在错误链中
const chain = this.errorChain.get(agentId) || [];
chain.push(event);
this.errorChain.set(agentId, chain);
// 发射以供处理
this.emit(`error:${agentId}`, event);
// 等待处理
return new Promise((resolve) => {
setTimeout(() => {
resolve(event.handled);
}, 100);
});
}
private async tryHandleError(
agentId: string,
event: ErrorEvent
): Promise<boolean> {
const handler = this.handlers.get(agentId);
if (handler) {
try {
event.handled = await handler(event);
return event.handled;
} catch (handlerError) {
console.error(`Handler for ${agentId} failed:`, handlerError);
return false;
}
}
return false;
}
getErrorChain(agentId: string): ErrorEvent[] {
return this.errorChain.get(agentId) || [];
}
clearErrorChain(agentId?: string) {
if (agentId) {
this.errorChain.delete(agentId);
} else {
this.errorChain.clear();
}
}
}
通过本地处理和父级升级管理代理层次结构中的错误传播。
2. 构建带验证的自修正工作流
// lib/workflows/self-correcting-workflow.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage, HumanMessage, SystemMessage } from '@langchain/core/messages';
import { z } from 'zod';
import { ErrorPropagationManager, ErrorEvent } from '@/lib/agents/error-propagation';
import { pipe, chunk, map, filter } from 'es-toolkit';
// 输出验证架构
const DataExtractionSchema = z.object({
entities: z.array(z.string()),
relationships: z.array(z.object({
source: z.string(),
target: z.string(),
type: z.string()
})),
metadata: z.record(z.any())
});
const AnalysisResultSchema = z.object({
insights: z.array(z.string()),
confidence: z.number().min(0).max(1),
recommendations: z.array(z.string())
});
interface WorkflowState {
messages: BaseMessage[];
stage: string;
extractedData?: z.infer<typeof DataExtractionSchema>;
analysisResult?: z.infer<typeof AnalysisResultSchema>;
validationErrors: string[];
correctionAttempts: number;
finalOutput?: string;
}
export class SelfCorrectingWorkflow {
private model: ChatGoogleGenerativeAI;
private errorManager: ErrorPropagationManager;
private maxCorrectionAttempts = 3;
constructor() {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0.2,
maxOutputTokens: 4096
});
this.errorManager = new ErrorPropagationManager();
this.setupErrorHandlers();
}
private setupErrorHandlers() {
// 使用错误处理器注册代理
this.errorManager.registerAgent('extraction', undefined, async (event) => {
console.log('Extraction error:', event.error.message);
return false; // 让它传播
});
this.errorManager.registerAgent('validation', 'extraction', async (event) => {
console.log('Validation error, attempting correction');
return true; // 本地处理
});
this.errorManager.registerAgent('analysis', 'validation');
this.errorManager.registerAgent('synthesis', 'analysis');
}
createWorkflow() {
const workflow = new StateGraph<WorkflowState>({
channels: {
messages: {
value: (x: BaseMessage[], y: BaseMessage[]) => [...x, ...y],
default: () => []
},
stage: {
value: (x: string, y: string) => y || x,
default: () => 'extraction'
},
extractedData: {
value: (x: any, y: any) => y || x,
default: () => undefined
},
analysisResult: {
value: (x: any, y: any) => y || x,
default: () => undefined
},
validationErrors: {
value: (x: string[], y: string[]) => [...x, ...y],
default: () => []
},
correctionAttempts: {
value: (x: number, y: number) => y ?? x,
default: () => 0
},
finalOutput: {
value: (x: string, y: string) => y || x,
default: () => undefined
}
}
});
// 具有结构化输出的提取节点
workflow.addNode('extraction', async (state) => {
try {
const prompt = `从以下文本中提取实体和关系。
返回具有此结构的 JSON:
{
"entities": ["entity1", "entity2"],
"relationships": [
{"source": "entity1", "target": "entity2", "type": "relation_type"}
],
"metadata": {}
}
文本:${state.messages[0].content}`;
const response = await this.model.invoke([
new SystemMessage('您是数据提取专家。始终返回有效的 JSON。'),
new HumanMessage(prompt)
]);
// 解析和验证 JSON
const jsonStr = response.content.toString()
.replace(/```json\n?/g, '')
.replace(/```\n?/g, '');
const parsed = JSON.parse(jsonStr);
const validated = DataExtractionSchema.parse(parsed);
return {
extractedData: validated,
stage: 'validation'
};
} catch (error) {
await this.errorManager.reportError('extraction', error as Error);
return {
stage: 'correction',
validationErrors: [`提取失败:${error}`]
};
}
});
// 验证节点
workflow.addNode('validation', async (state) => {
if (!state.extractedData) {
return {
stage: 'correction',
validationErrors: ['没有要验证的数据']
};
}
const errors: string[] = [];
// 验证提取的数据质量
if (state.extractedData.entities.length === 0) {
errors.push('未提取到实体');
}
if (state.extractedData.relationships.length === 0 &&
state.extractedData.entities.length > 1) {
errors.push('有多个实体但未定义关系');
}
// 检查孤立的关系
const entities = new Set(state.extractedData.entities);
for (const rel of state.extractedData.relationships) {
if (!entities.has(rel.source) || !entities.has(rel.target)) {
errors.push(`关系引用了未知实体:${rel.source} -> ${rel.target}`);
}
}
if (errors.length > 0) {
return {
stage: 'correction',
validationErrors: errors
};
}
return { stage: 'analysis' };
});
// 自修正节点
workflow.addNode('correction', async (state) => {
if (state.correctionAttempts >= this.maxCorrectionAttempts) {
return {
stage: 'failure',
finalOutput: `在 ${this.maxCorrectionAttempts} 次修正尝试后失败。错误:${state.validationErrors.join(';')}`
};
}
const correctionPrompt = `之前的提取有以下错误:
${state.validationErrors.join('\n')}
请更正此文本的提取:
${state.messages[0].content}
确保解决所有验证错误。`;
try {
const response = await this.model.invoke([
new SystemMessage('您是数据提取专家。修复错误并返回有效的 JSON。'),
new HumanMessage(correctionPrompt)
]);
const jsonStr = response.content.toString()
.replace(/```json\n?/g, '')
.replace(/```\n?/g, '');
const parsed = JSON.parse(jsonStr);
const validated = DataExtractionSchema.parse(parsed);
return {
extractedData: validated,
stage: 'validation',
correctionAttempts: state.correctionAttempts + 1,
validationErrors: [] // 清除错误
};
} catch (error) {
return {
stage: 'correction',
correctionAttempts: state.correctionAttempts + 1,
validationErrors: [...state.validationErrors, `修正失败:${error}`]
};
}
});
// 分析节点
workflow.addNode('analysis', async (state) => {
if (!state.extractedData) {
return {
stage: 'failure',
finalOutput: '没有可用于分析的数据'
};
}
try {
const analysisPrompt = `分析以下提取的数据并提供见解:
实体:${state.extractedData.entities.join('、')}
关系:${JSON.stringify(state.extractedData.relationships)}
以 JSON 格式提供分析:
{
"insights": ["insight1", "insight2"],
"confidence": 0.0-1.0,
"recommendations": ["rec1", "rec2"]
}`;
const response = await this.model.invoke([
new SystemMessage('您是数据分析师。提供深思熟虑的见解。'),
new HumanMessage(analysisPrompt)
]);
const jsonStr = response.content.toString()
.replace(/```json\n?/g, '')
.replace(/```\n?/g, '');
const parsed = JSON.parse(jsonStr);
const validated = AnalysisResultSchema.parse(parsed);
return {
analysisResult: validated,
stage: 'synthesis'
};
} catch (error) {
await this.errorManager.reportError('analysis', error as Error);
// 优雅降级
return {
analysisResult: {
insights: ['分析部分完成'],
confidence: 0.3,
recommendations: ['建议手动审查']
},
stage: 'synthesis'
};
}
});
// 综合节点
workflow.addNode('synthesis', async (state) => {
const report = `## 分析报告
### 提取的数据
- **发现的实体**:${state.extractedData?.entities.length || 0}
- **识别的关系**:${state.extractedData?.relationships.length || 0}
### 关键见解
${state.analysisResult?.insights.map(i => `- ${i}`).join('\n') || '无可用见解'}
### 置信度水平
${(state.analysisResult?.confidence || 0) * 100}%
### 建议
${state.analysisResult?.recommendations.map(r => `- ${r}`).join('\n') || '无建议'}
### 数据质量
- 遇到的验证错误:${state.validationErrors.length}
- 修正尝试:${state.correctionAttempts}
- 最终状态:${state.validationErrors.length === 0 ? '成功' : '部分成功'}`;
return {
finalOutput: report,
stage: 'complete'
};
});
// 定义边
workflow.addConditionalEdges('extraction', [
{ condition: (s) => s.stage === 'validation', node: 'validation' },
{ condition: (s) => s.stage === 'correction', node: 'correction' }
]);
workflow.addConditionalEdges('validation', [
{ condition: (s) => s.stage === 'analysis', node: 'analysis' },
{ condition: (s) => s.stage === 'correction', node: 'correction' }
]);
workflow.addConditionalEdges('correction', [
{ condition: (s) => s.stage === 'validation', node: 'validation' },
{ condition: (s) => s.stage === 'failure', node: 'synthesis' }
]);
workflow.addEdge('analysis', 'synthesis');
workflow.addEdge('synthesis', END);
workflow.setEntryPoint('extraction');
return workflow.compile();
}
async execute(input: string): Promise<{
success: boolean;
output: string;
metrics: {
correctionAttempts: number;
validationErrors: string[];
errorChains: Map<string, ErrorEvent[]>;
};
}> {
const workflow = this.createWorkflow();
const result = await workflow.invoke({
messages: [new HumanMessage(input)],
stage: 'extraction',
validationErrors: [],
correctionAttempts: 0
});
return {
success: result.validationErrors.length === 0,
output: result.finalOutput || '处理失败',
metrics: {
correctionAttempts: result.correctionAttempts,
validationErrors: result.validationErrors,
errorChains: this.errorManager['errorChain']
}
};
}
}
实现自修正工作流,验证输出并在检测到错误时自动尝试修正。
3. 创建监控仪表板 API
// app/api/self-correcting/route.ts
import { NextResponse } from 'next/server';
import { SelfCorrectingWorkflow } from '@/lib/workflows/self-correcting-workflow';
import { z } from 'zod';
export const runtime = 'nodejs';
export const maxDuration = 300;
const RequestSchema = z.object({
text: z.string().min(10).max(5000),
enableMonitoring: z.boolean().default(true)
});
export async function POST(req: Request) {
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
(async () => {
try {
const body = await req.json();
const validation = RequestSchema.safeParse(body);
if (!validation.success) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'error',
message: '无效请求',
errors: validation.error.issues
})}\n\n`)
);
await writer.close();
return;
}
const { text, enableMonitoring } = validation.data;
// 初始确认
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'start',
message: '启动自修正工作流'
})}\n\n`)
);
const workflow = new SelfCorrectingWorkflow();
// 执行工作流
const startTime = Date.now();
const result = await workflow.execute(text);
const executionTime = Date.now() - startTime;
// 流式传输进度更新
if (result.metrics.correctionAttempts > 0) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'correction',
attempts: result.metrics.correctionAttempts,
errors: result.metrics.validationErrors
})}\n\n`)
);
}
// 如果启用监控,流式传输错误链
if (enableMonitoring && result.metrics.errorChains.size > 0) {
const errorSummary = Array.from(result.metrics.errorChains.entries())
.map(([agent, events]) => ({
agent,
errorCount: events.length,
handled: events.filter(e => e.handled).length
}));
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'monitoring',
errorSummary
})}\n\n`)
);
}
// 最终结果
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'complete',
success: result.success,
output: result.output,
executionTime,
metrics: {
correctionAttempts: result.metrics.correctionAttempts,
errorCount: result.metrics.validationErrors.length
}
})}\n\n`)
);
} catch (error) {
console.error('Workflow execution error:', error);
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'critical_error',
message: error instanceof Error ? error.message : '未知错误'
})}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
提供工作流执行和错误修正实时更新的流式 API 端点。
4. 构建交互式监控仪表板
// components/SelfCorrectingDashboard.tsx
'use client';
import { useState, useEffect } from 'react';
import { useMutation } from '@tanstack/react-query';
import { pipe, groupBy, map as mapUtil, reduce } from 'es-toolkit';
interface WorkflowEvent {
type: 'start' | 'correction' | 'monitoring' | 'complete' | 'error' | 'critical_error';
message?: string;
attempts?: number;
errors?: string[];
errorSummary?: Array<{
agent: string;
errorCount: number;
handled: number;
}>;
output?: string;
success?: boolean;
executionTime?: number;
metrics?: {
correctionAttempts: number;
errorCount: number;
};
}
export default function SelfCorrectingDashboard() {
const [inputText, setInputText] = useState('');
const [events, setEvents] = useState<WorkflowEvent[]>([]);
const [isProcessing, setIsProcessing] = useState(false);
const [enableMonitoring, setEnableMonitoring] = useState(true);
const [result, setResult] = useState<string | null>(null);
const processWorkflow = useMutation({
mutationFn: async (params: { text: string; enableMonitoring: boolean }) => {
setEvents([]);
setResult(null);
setIsProcessing(true);
const response = await fetch('/api/self-correcting', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(params),
});
if (!response.ok) {
throw new Error('工作流失败');
}
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const event: WorkflowEvent = JSON.parse(line.slice(6));
setEvents(prev => [...prev, event]);
if (event.type === 'complete' && event.output) {
setResult(event.output);
}
} catch (e) {
console.error('Failed to parse event:', e);
}
}
}
}
},
onSettled: () => {
setIsProcessing(false);
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (inputText.trim().length >= 10) {
processWorkflow.mutate({ text: inputText, enableMonitoring });
}
};
const getCorrectionStats = () => {
const correctionEvents = events.filter(e => e.type === 'correction');
if (correctionEvents.length === 0) return null;
const lastCorrection = correctionEvents[correctionEvents.length - 1];
return {
attempts: lastCorrection.attempts || 0,
errors: lastCorrection.errors || []
};
};
const getExecutionMetrics = () => {
const completeEvent = events.find(e => e.type === 'complete');
if (!completeEvent) return null;
return {
time: completeEvent.executionTime,
success: completeEvent.success,
corrections: completeEvent.metrics?.correctionAttempts || 0,
errors: completeEvent.metrics?.errorCount || 0
};
};
return (
<div className="container mx-auto p-4 space-y-4">
{/* 标题 */}
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h1 className="card-title text-2xl">自修正工作流仪表板</h1>
<p className="text-base-content/70">
观察 AI 实时自动检测和纠正错误
</p>
</div>
</div>
{/* 输入表单 */}
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<form onSubmit={handleSubmit} className="space-y-4">
<div className="form-control">
<label className="label">
<span className="label-text">输入文本(最少 10 个字符)</span>
</label>
<textarea
className="textarea textarea-bordered h-32"
placeholder="输入用于提取和分析的文本..."
value={inputText}
onChange={(e) => setInputText(e.target.value)}
disabled={isProcessing}
/>
</div>
<div className="form-control">
<label className="label cursor-pointer">
<span className="label-text">启用错误监控</span>
<input
type="checkbox"
className="toggle toggle-primary"
checked={enableMonitoring}
onChange={(e) => setEnableMonitoring(e.target.checked)}
/>
</label>
</div>
<button
type="submit"
className="btn btn-primary w-full"
disabled={isProcessing || inputText.trim().length < 10}
>
{isProcessing ? (
<>
<span className="loading loading-spinner"></span>
正在处理工作流...
</>
) : '执行自修正工作流'}
</button>
</form>
</div>
</div>
{/* 执行时间线 */}
{events.length > 0 && (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">执行时间线</h2>
<ul className="timeline timeline-vertical">
{events.map((event, idx) => (
<li key={idx}>
{idx > 0 && <hr />}
<div className="timeline-start">{idx + 1}</div>
<div className="timeline-middle">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 20 20" fill="currentColor" className={`w-5 h-5 ${
event.type === 'error' || event.type === 'critical_error' ? 'text-error' :
event.type === 'correction' ? 'text-warning' :
event.type === 'complete' ? 'text-success' :
'text-primary'
}`}>
<path fillRule="evenodd" d="M10 18a8 8 0 100-16 8 8 0 000 16zm3.857-9.809a.75.75 0 00-1.214-.882l-3.483 4.79-1.88-1.88a.75.75 0 10-1.06 1.061l2.5 2.5a.75.75 0 001.137-.089l4-5.5z" clipRule="evenodd" />
</svg>
</div>
<div className="timeline-end timeline-box">
<div className="font-semibold capitalize">{event.type}</div>
{event.message && (
<p className="text-sm opacity-80">{event.message}</p>
)}
{event.attempts && (
<div className="badge badge-warning badge-sm mt-1">
{event.attempts} 次修正尝试
</div>
)}
</div>
{idx < events.length - 1 && <hr />}
</li>
))}
</ul>
</div>
</div>
)}
{/* 指标仪表板 */}
{getExecutionMetrics() && (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">执行指标</h2>
<div className="stats stats-vertical lg:stats-horizontal shadow">
<div className="stat">
<div className="stat-title">状态</div>
<div className="stat-value text-lg">
{getExecutionMetrics()?.success ? (
<span className="text-success">成功</span>
) : (
<span className="text-warning">部分</span>
)}
</div>
</div>
<div className="stat">
<div className="stat-title">执行时间</div>
<div className="stat-value text-lg">
{(getExecutionMetrics()?.time || 0) / 1000}秒
</div>
</div>
<div className="stat">
<div className="stat-title">修正</div>
<div className="stat-value text-lg">
{getExecutionMetrics()?.corrections || 0}
</div>
</div>
<div className="stat">
<div className="stat-title">处理的错误</div>
<div className="stat-value text-lg">
{getExecutionMetrics()?.errors || 0}
</div>
</div>
</div>
</div>
</div>
)}
{/* 结果显示 */}
{result && (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">工作流结果</h2>
<div className="mockup-code">
<pre className="text-sm"><code>{result}</code></pre>
</div>
</div>
</div>
)}
{/* 错误显示 */}
{processWorkflow.isError && (
<div className="alert alert-error shadow-lg">
<span>工作流执行失败。请重试。</span>
</div>
)}
</div>
);
}
提供工作流执行、错误修正和性能指标实时可视化的交互式仪表板。
结论
智能代理设计模式中的异常处理不仅仅是简单的 try-catch 块——它是关于构建智能、自愈的系统,这些系统能够预见故障、优雅恢复并从错误中学习。这里展示的模式(错误边界、恢复管理器、自修正和错误传播)构成了生产就绪代理的基础。请记住,在无服务器环境中,适当的超时管理、状态持久性和优雅降级是必不可少的。关键是构建不仅能处理错误,而且能将其作为提高可靠性和用户体验机会的系统。