草案 "智能代理设计模式 - 异常处理"

aiagentsexception-handlingtypescriptlangchainlanggraphvercelerror-recovery
By sko X opus 4.19/20/202521 min read

掌握使用 TypeScript、LangChain、LangGraph 和 Vercel 无服务器平台构建容错 AI 代理的技术,优雅地处理错误、从故障中恢复并在生产环境中保持可靠性。

心智模型:自适应应急响应系统

将代理中的异常处理想象成医院的应急响应系统。就像医院针对不同严重级别有不同的协议(轻伤→护士、中度→医生、危急→完整创伤团队),代理需要分层的错误响应。网络错误就像供应延迟(等待并重试),API 故障就像设备故障(使用备用设备),系统崩溃就像断电(启动应急发电机并通知管理员)。该系统不仅对问题做出反应,还从中学习,调整协议,并维持服务连续性。您的代理同样应该及早检测问题、适当响应、优雅恢复,并从每次事件中改进。

基础示例:带错误边界的健壮代理

1. 定义错误层次结构和恢复策略

// lib/errors/exception-types.ts
import { z } from 'zod';
import { isError, isString } from 'es-toolkit';

export const ErrorLevelSchema = z.enum(['transient', 'recoverable', 'critical']);
export type ErrorLevel = z.infer<typeof ErrorLevelSchema>;

export const RecoveryStrategySchema = z.enum([
  'retry',
  'fallback',
  'cache',
  'degrade',
  'escalate',
  'abort'
]);
export type RecoveryStrategy = z.infer<typeof RecoveryStrategySchema>;

export interface ErrorContext {
  timestamp: Date;
  attempt: number;
  maxAttempts: number;
  strategy: RecoveryStrategy;
  metadata?: Record<string, any>;
}

export class AgentException extends Error {
  constructor(
    message: string,
    public level: ErrorLevel,
    public strategy: RecoveryStrategy,
    public context?: ErrorContext
  ) {
    super(message);
    this.name = 'AgentException';
  }

  static fromError(error: unknown, level: ErrorLevel = 'recoverable'): AgentException {
    if (error instanceof AgentException) return error;

    const message = isError(error) ? error.message :
                   isString(error) ? error :
                   'Unknown error occurred';

    return new AgentException(message, level, 'retry');
  }
}

export class ToolException extends AgentException {
  constructor(
    public toolName: string,
    message: string,
    strategy: RecoveryStrategy = 'fallback'
  ) {
    super(`Tool [${toolName}]: ${message}`, 'recoverable', strategy);
    this.name = 'ToolException';
  }
}

export class ValidationException extends AgentException {
  constructor(
    message: string,
    public validationErrors?: z.ZodIssue[]
  ) {
    super(message, 'transient', 'retry');
    this.name = 'ValidationException';
  }
}

创建具有明确恢复策略和上下文信息的结构化错误层次结构,用于智能错误处理。

2. 构建错误恢复管理器

// lib/recovery/recovery-manager.ts
import { retry, delay, throttle } from 'es-toolkit';
import {
  AgentException,
  ErrorLevel,
  RecoveryStrategy,
  ErrorContext
} from '@/lib/errors/exception-types';

interface RecoveryConfig {
  maxRetries: number;
  baseDelay: number;
  maxDelay: number;
  timeout: number;
  fallbackHandlers: Map<string, () => Promise<any>>;
}

export class RecoveryManager {
  private errorHistory: AgentException[] = [];
  private config: RecoveryConfig;

  constructor(config: Partial<RecoveryConfig> = {}) {
    this.config = {
      maxRetries: config.maxRetries ?? 3,
      baseDelay: config.baseDelay ?? 1000,
      maxDelay: config.maxDelay ?? 30000,
      timeout: config.timeout ?? 777000, // Vercel 限制
      fallbackHandlers: config.fallbackHandlers ?? new Map()
    };
  }

  async executeWithRecovery<T>(
    operation: () => Promise<T>,
    operationName: string
  ): Promise<T> {
    const startTime = Date.now();
    let lastError: AgentException | null = null;

    for (let attempt = 1; attempt <= this.config.maxRetries; attempt++) {
      try {
        // 检查超时
        if (Date.now() - startTime > this.config.timeout) {
          throw new AgentException(
            'Operation timeout exceeded',
            'critical',
            'abort'
          );
        }

        // 执行操作
        const result = await Promise.race([
          operation(),
          this.createTimeout(this.config.timeout - (Date.now() - startTime))
        ]);

        // 成功时清除错误历史
        if (attempt > 1) {
          console.log(`Recovery successful for ${operationName} on attempt ${attempt}`);
        }

        return result;

      } catch (error) {
        lastError = AgentException.fromError(error);
        lastError.context = {
          timestamp: new Date(),
          attempt,
          maxAttempts: this.config.maxRetries,
          strategy: this.determineStrategy(lastError, attempt)
        };

        this.errorHistory.push(lastError);

        // 应用恢复策略
        const recovered = await this.applyStrategy(
          lastError,
          operationName,
          attempt
        );

        if (recovered !== null) {
          return recovered;
        }

        // 计算退避延迟
        if (attempt < this.config.maxRetries) {
          const delayMs = Math.min(
            this.config.baseDelay * Math.pow(2, attempt - 1),
            this.config.maxDelay
          );
          await delay(delayMs);
        }
      }
    }

    throw lastError || new AgentException(
      `${operationName} failed after ${this.config.maxRetries} attempts`,
      'critical',
      'escalate'
    );
  }

  private determineStrategy(
    error: AgentException,
    attempt: number
  ): RecoveryStrategy {
    // 关键错误应立即升级
    if (error.level === 'critical') return 'escalate';

    // 瞬态错误应首先重试
    if (error.level === 'transient' && attempt < this.config.maxRetries) {
      return 'retry';
    }

    // 可恢复错误应尝试回退
    if (error.level === 'recoverable') {
      return 'fallback';
    }

    // 默认降级服务
    return 'degrade';
  }

  private async applyStrategy<T>(
    error: AgentException,
    operationName: string,
    attempt: number
  ): Promise<T | null> {
    const strategy = error.context?.strategy || error.strategy;

    switch (strategy) {
      case 'fallback':
        const fallback = this.config.fallbackHandlers.get(operationName);
        if (fallback) {
          console.log(`Applying fallback for ${operationName}`);
          return await fallback();
        }
        break;

      case 'cache':
        // 如果可用,返回缓存的结果
        console.log(`Would return cached result for ${operationName}`);
        break;

      case 'degrade':
        console.log(`Degrading service for ${operationName}`);
        return null;

      case 'escalate':
        console.error(`Escalating error for ${operationName}:`, error.message);
        throw error;

      case 'abort':
        console.error(`Aborting ${operationName}`);
        throw error;
    }

    return null;
  }

  private createTimeout(ms: number): Promise<never> {
    return new Promise((_, reject) =>
      setTimeout(() => reject(new AgentException(
        'Operation timeout',
        'transient',
        'retry'
      )), ms)
    );
  }

  getErrorHistory(): AgentException[] {
    return [...this.errorHistory];
  }

  clearHistory(): void {
    this.errorHistory = [];
  }
}

实现一个复杂的恢复管理器,根据错误类型确定并应用适当的恢复策略。

3. 创建带工具的错误感知代理

// lib/agents/error-aware-agent.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { DynamicStructuredTool } from '@langchain/core/tools';
import { z } from 'zod';
import { RecoveryManager } from '@/lib/recovery/recovery-manager';
import { ToolException, ValidationException } from '@/lib/errors/exception-types';
import { pipe, map, filter, reduce } from 'es-toolkit';

// 处理错误的安全工具包装器
export function createSafeTool(
  name: string,
  description: string,
  schema: z.ZodSchema,
  implementation: (input: any) => Promise<any>,
  fallback?: () => Promise<any>
) {
  return new DynamicStructuredTool({
    name,
    description,
    schema,
    func: async (input) => {
      const recoveryManager = new RecoveryManager({
        maxRetries: 2,
        fallbackHandlers: fallback ?
          new Map([[name, fallback]]) :
          new Map()
      });

      try {
        // 验证输入
        const validated = schema.safeParse(input);
        if (!validated.success) {
          throw new ValidationException(
            'Invalid tool input',
            validated.error.issues
          );
        }

        // 带恢复执行
        return await recoveryManager.executeWithRecovery(
          () => implementation(validated.data),
          name
        );

      } catch (error) {
        console.error(`Tool ${name} failed:`, error);
        throw new ToolException(name,
          error instanceof Error ? error.message : 'Unknown error'
        );
      }
    }
  });
}

// 带内置错误处理的示例工具
export function createResilientTools() {
  const weatherTool = createSafeTool(
    'get_weather',
    'Get current weather for a location',
    z.object({
      location: z.string().min(1),
      units: z.enum(['celsius', 'fahrenheit']).default('celsius')
    }),
    async (input) => {
      // 模拟偶尔的失败
      if (Math.random() < 0.3) {
        throw new Error('Weather API unavailable');
      }
      return {
        location: input.location,
        temperature: 22,
        units: input.units,
        conditions: 'Partly cloudy'
      };
    },
    async () => ({
      location: 'Unknown',
      temperature: 20,
      units: 'celsius',
      conditions: 'Data unavailable',
      source: 'fallback'
    })
  );

  const calculatorTool = createSafeTool(
    'calculator',
    'Perform mathematical calculations',
    z.object({
      expression: z.string(),
      precision: z.number().int().min(0).max(10).default(2)
    }),
    async (input) => {
      try {
        // 使用 Function 构造函数的安全评估
        const result = new Function('return ' + input.expression)();
        return {
          expression: input.expression,
          result: Number(result.toFixed(input.precision))
        };
      } catch {
        throw new Error('Invalid mathematical expression');
      }
    }
  );

  const searchTool = createSafeTool(
    'web_search',
    'Search the web for information',
    z.object({
      query: z.string().min(1).max(200),
      maxResults: z.number().int().min(1).max(10).default(5)
    }),
    async (input) => {
      // 模拟可能失败的搜索
      if (Math.random() < 0.2) {
        throw new Error('Search service timeout');
      }

      return {
        query: input.query,
        results: Array.from({ length: input.maxResults }, (_, i) => ({
          title: `Result ${i + 1} for "${input.query}"`,
          snippet: `Sample content for result ${i + 1}`,
          url: `https://example.com/result-${i + 1}`
        }))
      };
    },
    async () => ({
      query: 'fallback',
      results: [],
      error: 'Search unavailable, please try again later'
    })
  );

  return [weatherTool, calculatorTool, searchTool];
}

创建具有验证、恢复机制和回退处理器的错误感知工具,以实现可靠执行。

4. 实现带错误边界的代理 API

// app/api/error-aware-agent/route.ts
import { NextResponse } from 'next/server';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { createResilientTools } from '@/lib/agents/error-aware-agent';
import { RecoveryManager } from '@/lib/recovery/recovery-manager';
import { AgentException } from '@/lib/errors/exception-types';
import { createReactAgent } from '@langchain/langgraph/prebuilt';
import { HumanMessage } from '@langchain/core/messages';

export const runtime = 'nodejs';
export const maxDuration = 300;

export async function POST(req: Request) {
  const recoveryManager = new RecoveryManager();

  try {
    const { message } = await req.json();

    if (!message || typeof message !== 'string') {
      throw new AgentException(
        'Invalid request: message is required',
        'transient',
        'retry'
      );
    }

    // 带恢复执行代理
    const result = await recoveryManager.executeWithRecovery(
      async () => {
        const model = new ChatGoogleGenerativeAI({
          modelName: 'gemini-2.5-flash',
          temperature: 0.3,
          maxOutputTokens: 2048
        });

        const tools = createResilientTools();

        const agent = createReactAgent({
          llm: model,
          tools,
          messageModifier: `您是一个具有错误恢复能力的有用助手。
            如果工具失败,请优雅地承认并尽可能尝试替代方案。
            即使工具不可用,也始终提供有用的响应。`
        });

        const response = await agent.invoke({
          messages: [new HumanMessage(message)]
        });

        return response.messages[response.messages.length - 1].content;
      },
      'agent-execution'
    );

    const errorHistory = recoveryManager.getErrorHistory();

    return NextResponse.json({
      success: true,
      result,
      recoveryAttempts: errorHistory.length,
      errors: errorHistory.map(e => ({
        message: e.message,
        level: e.level,
        strategy: e.strategy,
        attempt: e.context?.attempt
      }))
    });

  } catch (error) {
    const agentError = AgentException.fromError(error, 'critical');

    console.error('Agent execution failed:', agentError);

    return NextResponse.json(
      {
        success: false,
        error: agentError.message,
        level: agentError.level,
        strategy: agentError.strategy,
        errorHistory: recoveryManager.getErrorHistory()
      },
      { status: agentError.level === 'critical' ? 500 : 503 }
    );
  }
}

实现具有详细错误跟踪和恢复尝试的全面错误边界的 API 路由。

5. 创建带错误反馈的前端

// components/ErrorAwareChat.tsx
'use client';

import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
import { pipe, groupBy, map as mapUtil } from 'es-toolkit';

interface ChatResponse {
  success: boolean;
  result?: string;
  error?: string;
  level?: string;
  recoveryAttempts?: number;
  errors?: Array<{
    message: string;
    level: string;
    strategy: string;
    attempt?: number;
  }>;
}

export default function ErrorAwareChat() {
  const [message, setMessage] = useState('');
  const [chatHistory, setChatHistory] = useState<Array<{
    role: 'user' | 'assistant' | 'error';
    content: string;
    metadata?: any;
  }>>([]);

  const sendMessage = useMutation<ChatResponse, Error, string>({
    mutationFn: async (message: string) => {
      const response = await fetch('/api/error-aware-agent', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ message }),
      });

      const data = await response.json();

      if (!response.ok && !data.success) {
        throw new Error(data.error || 'Request failed');
      }

      return data;
    },
    onSuccess: (data) => {
      if (data.success) {
        setChatHistory(prev => [
          ...prev,
          { role: 'assistant', content: data.result!, metadata: data }
        ]);
      }
    },
    onError: (error) => {
      setChatHistory(prev => [
        ...prev,
        {
          role: 'error',
          content: `错误:${error.message}`,
          metadata: { timestamp: new Date() }
        }
      ]);
    }
  });

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (!message.trim()) return;

    setChatHistory(prev => [...prev, { role: 'user', content: message }]);
    sendMessage.mutate(message);
    setMessage('');
  };

  const getRecoveryBadge = (attempts?: number) => {
    if (!attempts) return null;

    const badgeClass = attempts > 2 ? 'badge-warning' : 'badge-info';
    return (
      <div className={`badge ${badgeClass} badge-sm`}>
        {attempts} 次恢复尝试
      </div>
    );
  };

  return (
    <div className="card w-full bg-base-100 shadow-xl">
      <div className="card-body">
        <h2 className="card-title">错误感知 AI 助手</h2>

        {/* 聊天历史 */}
        <div className="h-96 overflow-y-auto space-y-2 p-4 bg-base-200 rounded-lg">
          {chatHistory.map((msg, idx) => (
            <div
              key={idx}
              className={`chat ${msg.role === 'user' ? 'chat-end' : 'chat-start'}`}
            >
              <div className="chat-header">
                {msg.role === 'user' ? '您' :
                 msg.role === 'assistant' ? '助手' : '系统'}
              </div>
              <div className={`chat-bubble ${
                msg.role === 'error' ? 'chat-bubble-error' :
                msg.role === 'user' ? 'chat-bubble-primary' :
                'chat-bubble-secondary'
              }`}>
                {msg.content}
                {msg.metadata?.recoveryAttempts && (
                  <div className="mt-2">
                    {getRecoveryBadge(msg.metadata.recoveryAttempts)}
                  </div>
                )}
              </div>

              {msg.metadata?.errors && msg.metadata.errors.length > 0 && (
                <div className="chat-footer opacity-50 text-xs">
                  恢复自:{msg.metadata.errors[0].strategy}
                </div>
              )}
            </div>
          ))}

          {sendMessage.isPending && (
            <div className="chat chat-start">
              <div className="chat-bubble chat-bubble-secondary">
                <span className="loading loading-dots loading-sm"></span>
              </div>
            </div>
          )}
        </div>

        {/* 输入表单 */}
        <form onSubmit={handleSubmit} className="join w-full mt-4">
          <input
            type="text"
            className="input input-bordered join-item flex-1"
            placeholder="问我任何问题..."
            value={message}
            onChange={(e) => setMessage(e.target.value)}
            disabled={sendMessage.isPending}
          />
          <button
            type="submit"
            className="btn btn-primary join-item"
            disabled={sendMessage.isPending || !message.trim()}
          >
            发送
          </button>
        </form>

        {/* 错误状态 */}
        {sendMessage.isError && (
          <div className="alert alert-error mt-2">
            <span>发送消息失败。请重试。</span>
          </div>
        )}
      </div>
    </div>
  );
}

具有错误恢复尝试的视觉反馈和优雅错误显示的 React 组件。

高级示例:自修正多代理系统

1. 实现代理层次结构中的错误传播

// lib/agents/error-propagation.ts
import { EventEmitter } from 'events';
import { z } from 'zod';
import { throttle, debounce } from 'es-toolkit';

export interface ErrorEvent {
  agentId: string;
  parentId?: string;
  error: Error;
  timestamp: Date;
  handled: boolean;
  propagated: boolean;
}

export class ErrorPropagationManager extends EventEmitter {
  private errorChain: Map<string, ErrorEvent[]> = new Map();
  private handlers: Map<string, (error: ErrorEvent) => Promise<boolean>> = new Map();

  constructor() {
    super();
    this.setupErrorHandling();
  }

  private setupErrorHandling() {
    // 节流错误发射以防止泛滥
    const throttledEmit = throttle((event: ErrorEvent) => {
      this.emit('error', event);
    }, 1000);

    this.on('error', throttledEmit);
  }

  registerAgent(
    agentId: string,
    parentId?: string,
    errorHandler?: (error: ErrorEvent) => Promise<boolean>
  ) {
    this.errorChain.set(agentId, []);

    if (errorHandler) {
      this.handlers.set(agentId, errorHandler);
    }

    if (parentId) {
      // 设置错误传播链
      this.on(`error:${agentId}`, async (event: ErrorEvent) => {
        event.propagated = true;

        // 首先尝试本地处理器
        const handled = await this.tryHandleError(agentId, event);

        if (!handled) {
          // 传播到父级
          this.emit(`error:${parentId}`, {
            ...event,
            parentId: agentId
          });
        }
      });
    }
  }

  async reportError(agentId: string, error: Error): Promise<boolean> {
    const event: ErrorEvent = {
      agentId,
      error,
      timestamp: new Date(),
      handled: false,
      propagated: false
    };

    // 存储在错误链中
    const chain = this.errorChain.get(agentId) || [];
    chain.push(event);
    this.errorChain.set(agentId, chain);

    // 发射以供处理
    this.emit(`error:${agentId}`, event);

    // 等待处理
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve(event.handled);
      }, 100);
    });
  }

  private async tryHandleError(
    agentId: string,
    event: ErrorEvent
  ): Promise<boolean> {
    const handler = this.handlers.get(agentId);

    if (handler) {
      try {
        event.handled = await handler(event);
        return event.handled;
      } catch (handlerError) {
        console.error(`Handler for ${agentId} failed:`, handlerError);
        return false;
      }
    }

    return false;
  }

  getErrorChain(agentId: string): ErrorEvent[] {
    return this.errorChain.get(agentId) || [];
  }

  clearErrorChain(agentId?: string) {
    if (agentId) {
      this.errorChain.delete(agentId);
    } else {
      this.errorChain.clear();
    }
  }
}

通过本地处理和父级升级管理代理层次结构中的错误传播。

2. 构建带验证的自修正工作流

// lib/workflows/self-correcting-workflow.ts
import { StateGraph, END } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BaseMessage, HumanMessage, SystemMessage } from '@langchain/core/messages';
import { z } from 'zod';
import { ErrorPropagationManager, ErrorEvent } from '@/lib/agents/error-propagation';
import { pipe, chunk, map, filter } from 'es-toolkit';

// 输出验证架构
const DataExtractionSchema = z.object({
  entities: z.array(z.string()),
  relationships: z.array(z.object({
    source: z.string(),
    target: z.string(),
    type: z.string()
  })),
  metadata: z.record(z.any())
});

const AnalysisResultSchema = z.object({
  insights: z.array(z.string()),
  confidence: z.number().min(0).max(1),
  recommendations: z.array(z.string())
});

interface WorkflowState {
  messages: BaseMessage[];
  stage: string;
  extractedData?: z.infer<typeof DataExtractionSchema>;
  analysisResult?: z.infer<typeof AnalysisResultSchema>;
  validationErrors: string[];
  correctionAttempts: number;
  finalOutput?: string;
}

export class SelfCorrectingWorkflow {
  private model: ChatGoogleGenerativeAI;
  private errorManager: ErrorPropagationManager;
  private maxCorrectionAttempts = 3;

  constructor() {
    this.model = new ChatGoogleGenerativeAI({
      modelName: 'gemini-2.5-pro',
      temperature: 0.2,
      maxOutputTokens: 4096
    });

    this.errorManager = new ErrorPropagationManager();
    this.setupErrorHandlers();
  }

  private setupErrorHandlers() {
    // 使用错误处理器注册代理
    this.errorManager.registerAgent('extraction', undefined, async (event) => {
      console.log('Extraction error:', event.error.message);
      return false; // 让它传播
    });

    this.errorManager.registerAgent('validation', 'extraction', async (event) => {
      console.log('Validation error, attempting correction');
      return true; // 本地处理
    });

    this.errorManager.registerAgent('analysis', 'validation');
    this.errorManager.registerAgent('synthesis', 'analysis');
  }

  createWorkflow() {
    const workflow = new StateGraph<WorkflowState>({
      channels: {
        messages: {
          value: (x: BaseMessage[], y: BaseMessage[]) => [...x, ...y],
          default: () => []
        },
        stage: {
          value: (x: string, y: string) => y || x,
          default: () => 'extraction'
        },
        extractedData: {
          value: (x: any, y: any) => y || x,
          default: () => undefined
        },
        analysisResult: {
          value: (x: any, y: any) => y || x,
          default: () => undefined
        },
        validationErrors: {
          value: (x: string[], y: string[]) => [...x, ...y],
          default: () => []
        },
        correctionAttempts: {
          value: (x: number, y: number) => y ?? x,
          default: () => 0
        },
        finalOutput: {
          value: (x: string, y: string) => y || x,
          default: () => undefined
        }
      }
    });

    // 具有结构化输出的提取节点
    workflow.addNode('extraction', async (state) => {
      try {
        const prompt = `从以下文本中提取实体和关系。
返回具有此结构的 JSON:
{
  "entities": ["entity1", "entity2"],
  "relationships": [
    {"source": "entity1", "target": "entity2", "type": "relation_type"}
  ],
  "metadata": {}
}

文本:${state.messages[0].content}`;

        const response = await this.model.invoke([
          new SystemMessage('您是数据提取专家。始终返回有效的 JSON。'),
          new HumanMessage(prompt)
        ]);

        // 解析和验证 JSON
        const jsonStr = response.content.toString()
          .replace(/```json\n?/g, '')
          .replace(/```\n?/g, '');

        const parsed = JSON.parse(jsonStr);
        const validated = DataExtractionSchema.parse(parsed);

        return {
          extractedData: validated,
          stage: 'validation'
        };
      } catch (error) {
        await this.errorManager.reportError('extraction', error as Error);

        return {
          stage: 'correction',
          validationErrors: [`提取失败:${error}`]
        };
      }
    });

    // 验证节点
    workflow.addNode('validation', async (state) => {
      if (!state.extractedData) {
        return {
          stage: 'correction',
          validationErrors: ['没有要验证的数据']
        };
      }

      const errors: string[] = [];

      // 验证提取的数据质量
      if (state.extractedData.entities.length === 0) {
        errors.push('未提取到实体');
      }

      if (state.extractedData.relationships.length === 0 &&
          state.extractedData.entities.length > 1) {
        errors.push('有多个实体但未定义关系');
      }

      // 检查孤立的关系
      const entities = new Set(state.extractedData.entities);
      for (const rel of state.extractedData.relationships) {
        if (!entities.has(rel.source) || !entities.has(rel.target)) {
          errors.push(`关系引用了未知实体:${rel.source} -> ${rel.target}`);
        }
      }

      if (errors.length > 0) {
        return {
          stage: 'correction',
          validationErrors: errors
        };
      }

      return { stage: 'analysis' };
    });

    // 自修正节点
    workflow.addNode('correction', async (state) => {
      if (state.correctionAttempts >= this.maxCorrectionAttempts) {
        return {
          stage: 'failure',
          finalOutput: `在 ${this.maxCorrectionAttempts} 次修正尝试后失败。错误:${state.validationErrors.join(';')}`
        };
      }

      const correctionPrompt = `之前的提取有以下错误:
${state.validationErrors.join('\n')}

请更正此文本的提取:
${state.messages[0].content}

确保解决所有验证错误。`;

      try {
        const response = await this.model.invoke([
          new SystemMessage('您是数据提取专家。修复错误并返回有效的 JSON。'),
          new HumanMessage(correctionPrompt)
        ]);

        const jsonStr = response.content.toString()
          .replace(/```json\n?/g, '')
          .replace(/```\n?/g, '');

        const parsed = JSON.parse(jsonStr);
        const validated = DataExtractionSchema.parse(parsed);

        return {
          extractedData: validated,
          stage: 'validation',
          correctionAttempts: state.correctionAttempts + 1,
          validationErrors: [] // 清除错误
        };
      } catch (error) {
        return {
          stage: 'correction',
          correctionAttempts: state.correctionAttempts + 1,
          validationErrors: [...state.validationErrors, `修正失败:${error}`]
        };
      }
    });

    // 分析节点
    workflow.addNode('analysis', async (state) => {
      if (!state.extractedData) {
        return {
          stage: 'failure',
          finalOutput: '没有可用于分析的数据'
        };
      }

      try {
        const analysisPrompt = `分析以下提取的数据并提供见解:
实体:${state.extractedData.entities.join('、')}
关系:${JSON.stringify(state.extractedData.relationships)}

以 JSON 格式提供分析:
{
  "insights": ["insight1", "insight2"],
  "confidence": 0.0-1.0,
  "recommendations": ["rec1", "rec2"]
}`;

        const response = await this.model.invoke([
          new SystemMessage('您是数据分析师。提供深思熟虑的见解。'),
          new HumanMessage(analysisPrompt)
        ]);

        const jsonStr = response.content.toString()
          .replace(/```json\n?/g, '')
          .replace(/```\n?/g, '');

        const parsed = JSON.parse(jsonStr);
        const validated = AnalysisResultSchema.parse(parsed);

        return {
          analysisResult: validated,
          stage: 'synthesis'
        };
      } catch (error) {
        await this.errorManager.reportError('analysis', error as Error);

        // 优雅降级
        return {
          analysisResult: {
            insights: ['分析部分完成'],
            confidence: 0.3,
            recommendations: ['建议手动审查']
          },
          stage: 'synthesis'
        };
      }
    });

    // 综合节点
    workflow.addNode('synthesis', async (state) => {
      const report = `## 分析报告

### 提取的数据
- **发现的实体**:${state.extractedData?.entities.length || 0}
- **识别的关系**:${state.extractedData?.relationships.length || 0}

### 关键见解
${state.analysisResult?.insights.map(i => `- ${i}`).join('\n') || '无可用见解'}

### 置信度水平
${(state.analysisResult?.confidence || 0) * 100}%

### 建议
${state.analysisResult?.recommendations.map(r => `- ${r}`).join('\n') || '无建议'}

### 数据质量
- 遇到的验证错误:${state.validationErrors.length}
- 修正尝试:${state.correctionAttempts}
- 最终状态:${state.validationErrors.length === 0 ? '成功' : '部分成功'}`;

      return {
        finalOutput: report,
        stage: 'complete'
      };
    });

    // 定义边
    workflow.addConditionalEdges('extraction', [
      { condition: (s) => s.stage === 'validation', node: 'validation' },
      { condition: (s) => s.stage === 'correction', node: 'correction' }
    ]);

    workflow.addConditionalEdges('validation', [
      { condition: (s) => s.stage === 'analysis', node: 'analysis' },
      { condition: (s) => s.stage === 'correction', node: 'correction' }
    ]);

    workflow.addConditionalEdges('correction', [
      { condition: (s) => s.stage === 'validation', node: 'validation' },
      { condition: (s) => s.stage === 'failure', node: 'synthesis' }
    ]);

    workflow.addEdge('analysis', 'synthesis');
    workflow.addEdge('synthesis', END);

    workflow.setEntryPoint('extraction');

    return workflow.compile();
  }

  async execute(input: string): Promise<{
    success: boolean;
    output: string;
    metrics: {
      correctionAttempts: number;
      validationErrors: string[];
      errorChains: Map<string, ErrorEvent[]>;
    };
  }> {
    const workflow = this.createWorkflow();

    const result = await workflow.invoke({
      messages: [new HumanMessage(input)],
      stage: 'extraction',
      validationErrors: [],
      correctionAttempts: 0
    });

    return {
      success: result.validationErrors.length === 0,
      output: result.finalOutput || '处理失败',
      metrics: {
        correctionAttempts: result.correctionAttempts,
        validationErrors: result.validationErrors,
        errorChains: this.errorManager['errorChain']
      }
    };
  }
}

实现自修正工作流,验证输出并在检测到错误时自动尝试修正。

3. 创建监控仪表板 API

// app/api/self-correcting/route.ts
import { NextResponse } from 'next/server';
import { SelfCorrectingWorkflow } from '@/lib/workflows/self-correcting-workflow';
import { z } from 'zod';

export const runtime = 'nodejs';
export const maxDuration = 300;

const RequestSchema = z.object({
  text: z.string().min(10).max(5000),
  enableMonitoring: z.boolean().default(true)
});

export async function POST(req: Request) {
  const encoder = new TextEncoder();
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();

  (async () => {
    try {
      const body = await req.json();
      const validation = RequestSchema.safeParse(body);

      if (!validation.success) {
        await writer.write(
          encoder.encode(`data: ${JSON.stringify({
            type: 'error',
            message: '无效请求',
            errors: validation.error.issues
          })}\n\n`)
        );
        await writer.close();
        return;
      }

      const { text, enableMonitoring } = validation.data;

      // 初始确认
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          type: 'start',
          message: '启动自修正工作流'
        })}\n\n`)
      );

      const workflow = new SelfCorrectingWorkflow();

      // 执行工作流
      const startTime = Date.now();
      const result = await workflow.execute(text);
      const executionTime = Date.now() - startTime;

      // 流式传输进度更新
      if (result.metrics.correctionAttempts > 0) {
        await writer.write(
          encoder.encode(`data: ${JSON.stringify({
            type: 'correction',
            attempts: result.metrics.correctionAttempts,
            errors: result.metrics.validationErrors
          })}\n\n`)
        );
      }

      // 如果启用监控,流式传输错误链
      if (enableMonitoring && result.metrics.errorChains.size > 0) {
        const errorSummary = Array.from(result.metrics.errorChains.entries())
          .map(([agent, events]) => ({
            agent,
            errorCount: events.length,
            handled: events.filter(e => e.handled).length
          }));

        await writer.write(
          encoder.encode(`data: ${JSON.stringify({
            type: 'monitoring',
            errorSummary
          })}\n\n`)
        );
      }

      // 最终结果
      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          type: 'complete',
          success: result.success,
          output: result.output,
          executionTime,
          metrics: {
            correctionAttempts: result.metrics.correctionAttempts,
            errorCount: result.metrics.validationErrors.length
          }
        })}\n\n`)
      );

    } catch (error) {
      console.error('Workflow execution error:', error);

      await writer.write(
        encoder.encode(`data: ${JSON.stringify({
          type: 'critical_error',
          message: error instanceof Error ? error.message : '未知错误'
        })}\n\n`)
      );
    } finally {
      await writer.close();
    }
  })();

  return new Response(stream.readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

提供工作流执行和错误修正实时更新的流式 API 端点。

4. 构建交互式监控仪表板

// components/SelfCorrectingDashboard.tsx
'use client';

import { useState, useEffect } from 'react';
import { useMutation } from '@tanstack/react-query';
import { pipe, groupBy, map as mapUtil, reduce } from 'es-toolkit';

interface WorkflowEvent {
  type: 'start' | 'correction' | 'monitoring' | 'complete' | 'error' | 'critical_error';
  message?: string;
  attempts?: number;
  errors?: string[];
  errorSummary?: Array<{
    agent: string;
    errorCount: number;
    handled: number;
  }>;
  output?: string;
  success?: boolean;
  executionTime?: number;
  metrics?: {
    correctionAttempts: number;
    errorCount: number;
  };
}

export default function SelfCorrectingDashboard() {
  const [inputText, setInputText] = useState('');
  const [events, setEvents] = useState<WorkflowEvent[]>([]);
  const [isProcessing, setIsProcessing] = useState(false);
  const [enableMonitoring, setEnableMonitoring] = useState(true);
  const [result, setResult] = useState<string | null>(null);

  const processWorkflow = useMutation({
    mutationFn: async (params: { text: string; enableMonitoring: boolean }) => {
      setEvents([]);
      setResult(null);
      setIsProcessing(true);

      const response = await fetch('/api/self-correcting', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(params),
      });

      if (!response.ok) {
        throw new Error('工作流失败');
      }

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      while (reader) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            try {
              const event: WorkflowEvent = JSON.parse(line.slice(6));
              setEvents(prev => [...prev, event]);

              if (event.type === 'complete' && event.output) {
                setResult(event.output);
              }
            } catch (e) {
              console.error('Failed to parse event:', e);
            }
          }
        }
      }
    },
    onSettled: () => {
      setIsProcessing(false);
    },
  });

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (inputText.trim().length >= 10) {
      processWorkflow.mutate({ text: inputText, enableMonitoring });
    }
  };

  const getCorrectionStats = () => {
    const correctionEvents = events.filter(e => e.type === 'correction');
    if (correctionEvents.length === 0) return null;

    const lastCorrection = correctionEvents[correctionEvents.length - 1];
    return {
      attempts: lastCorrection.attempts || 0,
      errors: lastCorrection.errors || []
    };
  };

  const getExecutionMetrics = () => {
    const completeEvent = events.find(e => e.type === 'complete');
    if (!completeEvent) return null;

    return {
      time: completeEvent.executionTime,
      success: completeEvent.success,
      corrections: completeEvent.metrics?.correctionAttempts || 0,
      errors: completeEvent.metrics?.errorCount || 0
    };
  };

  return (
    <div className="container mx-auto p-4 space-y-4">
      {/* 标题 */}
      <div className="card bg-base-100 shadow-xl">
        <div className="card-body">
          <h1 className="card-title text-2xl">自修正工作流仪表板</h1>
          <p className="text-base-content/70">
            观察 AI 实时自动检测和纠正错误
          </p>
        </div>
      </div>

      {/* 输入表单 */}
      <div className="card bg-base-100 shadow-xl">
        <div className="card-body">
          <form onSubmit={handleSubmit} className="space-y-4">
            <div className="form-control">
              <label className="label">
                <span className="label-text">输入文本(最少 10 个字符)</span>
              </label>
              <textarea
                className="textarea textarea-bordered h-32"
                placeholder="输入用于提取和分析的文本..."
                value={inputText}
                onChange={(e) => setInputText(e.target.value)}
                disabled={isProcessing}
              />
            </div>

            <div className="form-control">
              <label className="label cursor-pointer">
                <span className="label-text">启用错误监控</span>
                <input
                  type="checkbox"
                  className="toggle toggle-primary"
                  checked={enableMonitoring}
                  onChange={(e) => setEnableMonitoring(e.target.checked)}
                />
              </label>
            </div>

            <button
              type="submit"
              className="btn btn-primary w-full"
              disabled={isProcessing || inputText.trim().length < 10}
            >
              {isProcessing ? (
                <>
                  <span className="loading loading-spinner"></span>
                  正在处理工作流...
                </>
              ) : '执行自修正工作流'}
            </button>
          </form>
        </div>
      </div>

      {/* 执行时间线 */}
      {events.length > 0 && (
        <div className="card bg-base-100 shadow-xl">
          <div className="card-body">
            <h2 className="card-title">执行时间线</h2>

            <ul className="timeline timeline-vertical">
              {events.map((event, idx) => (
                <li key={idx}>
                  {idx > 0 && <hr />}
                  <div className="timeline-start">{idx + 1}</div>
                  <div className="timeline-middle">
                    <svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 20 20" fill="currentColor" className={`w-5 h-5 ${
                      event.type === 'error' || event.type === 'critical_error' ? 'text-error' :
                      event.type === 'correction' ? 'text-warning' :
                      event.type === 'complete' ? 'text-success' :
                      'text-primary'
                    }`}>
                      <path fillRule="evenodd" d="M10 18a8 8 0 100-16 8 8 0 000 16zm3.857-9.809a.75.75 0 00-1.214-.882l-3.483 4.79-1.88-1.88a.75.75 0 10-1.06 1.061l2.5 2.5a.75.75 0 001.137-.089l4-5.5z" clipRule="evenodd" />
                    </svg>
                  </div>
                  <div className="timeline-end timeline-box">
                    <div className="font-semibold capitalize">{event.type}</div>
                    {event.message && (
                      <p className="text-sm opacity-80">{event.message}</p>
                    )}
                    {event.attempts && (
                      <div className="badge badge-warning badge-sm mt-1">
                        {event.attempts} 次修正尝试
                      </div>
                    )}
                  </div>
                  {idx < events.length - 1 && <hr />}
                </li>
              ))}
            </ul>
          </div>
        </div>
      )}

      {/* 指标仪表板 */}
      {getExecutionMetrics() && (
        <div className="card bg-base-100 shadow-xl">
          <div className="card-body">
            <h2 className="card-title">执行指标</h2>

            <div className="stats stats-vertical lg:stats-horizontal shadow">
              <div className="stat">
                <div className="stat-title">状态</div>
                <div className="stat-value text-lg">
                  {getExecutionMetrics()?.success ? (
                    <span className="text-success">成功</span>
                  ) : (
                    <span className="text-warning">部分</span>
                  )}
                </div>
              </div>

              <div className="stat">
                <div className="stat-title">执行时间</div>
                <div className="stat-value text-lg">
                  {(getExecutionMetrics()?.time || 0) / 1000}秒
                </div>
              </div>

              <div className="stat">
                <div className="stat-title">修正</div>
                <div className="stat-value text-lg">
                  {getExecutionMetrics()?.corrections || 0}
                </div>
              </div>

              <div className="stat">
                <div className="stat-title">处理的错误</div>
                <div className="stat-value text-lg">
                  {getExecutionMetrics()?.errors || 0}
                </div>
              </div>
            </div>
          </div>
        </div>
      )}

      {/* 结果显示 */}
      {result && (
        <div className="card bg-base-100 shadow-xl">
          <div className="card-body">
            <h2 className="card-title">工作流结果</h2>
            <div className="mockup-code">
              <pre className="text-sm"><code>{result}</code></pre>
            </div>
          </div>
        </div>
      )}

      {/* 错误显示 */}
      {processWorkflow.isError && (
        <div className="alert alert-error shadow-lg">
          <span>工作流执行失败。请重试。</span>
        </div>
      )}
    </div>
  );
}

提供工作流执行、错误修正和性能指标实时可视化的交互式仪表板。

结论

智能代理设计模式中的异常处理不仅仅是简单的 try-catch 块——它是关于构建智能、自愈的系统,这些系统能够预见故障、优雅恢复并从错误中学习。这里展示的模式(错误边界、恢复管理器、自修正和错误传播)构成了生产就绪代理的基础。请记住,在无服务器环境中,适当的超时管理、状态持久性和优雅降级是必不可少的。关键是构建不仅能处理错误,而且能将其作为提高可靠性和用户体验机会的系统。