草案 智能体设计模式 - 评估与监控

aiagentsmonitoringevaluationtypescriptlangchainlanggraphvercel
By sko X opus 4.19/21/202518 min read

学习如何为生产环境中的AI智能体实现全面的评估和监控系统。我们将使用TypeScript、LangChain和LangGraph在Vercel平台上构建可观察、可测试和可优化的智能体系统。

心智模型:天文台模式

将智能体的评估和监控想象成运营太空望远镜天文台。你需要多个仪器(指标)观测不同的波长(行为方面),持续跟踪(监控)以捕捉瞬态事件(错误/异常),以及校准系统(评估)来确保准确性。就像天文学家结合来自多个望远镜的数据来理解天体一样,我们结合多种评估和监控方法来全面理解智能体行为。

基础示例:内置评估的智能体

让我们从一个包含基本评估和监控功能的简单客户支持智能体开始。

1. 定义评估类型和指标

// app/lib/evaluation/types.ts
import { z } from 'zod';

export const EvaluationMetricSchema = z.object({
  accuracy: z.number().min(0).max(1),
  relevance: z.number().min(0).max(1),
  coherence: z.number().min(0).max(1),
  latency: z.number(),
  tokenUsage: z.object({
    input: z.number(),
    output: z.number(),
    total: z.number(),
  }),
  cost: z.number(),
  timestamp: z.string().datetime(),
});

export type EvaluationMetric = z.infer<typeof EvaluationMetricSchema>;

export const AgentTraceSchema = z.object({
  traceId: z.string(),
  parentId: z.string().optional(),
  agentName: z.string(),
  input: z.any(),
  output: z.any(),
  metrics: EvaluationMetricSchema,
  errors: z.array(z.string()).default([]),
  metadata: z.record(z.any()).default({}),
});

export type AgentTrace = z.infer<typeof AgentTraceSchema>;

类型通过Zod进行运行时验证,定义评估数据的结构。

2. 创建监控回调处理器

// app/lib/monitoring/callback.ts
import { BaseCallbackHandler } from '@langchain/core/callbacks/base';
import { Serialized } from '@langchain/core/load/serializable';
import { ChainValues } from '@langchain/core/utils/types';
import { AgentTrace, EvaluationMetric } from '../evaluation/types';
import { v4 as uuidv4 } from 'uuid';

export class MonitoringCallbackHandler extends BaseCallbackHandler {
  name = 'MonitoringCallbackHandler';
  private traces: Map<string, Partial<AgentTrace>> = new Map();
  private startTimes: Map<string, number> = new Map();

  async handleChainStart(
    chain: Serialized,
    inputs: ChainValues,
    runId: string,
  ): Promise<void> {
    const traceId = uuidv4();
    this.startTimes.set(runId, Date.now());

    this.traces.set(runId, {
      traceId,
      agentName: chain.id?.[chain.id.length - 1] || 'unknown',
      input: inputs,
      timestamp: new Date().toISOString(),
      errors: [],
      metadata: {},
    });
  }

  async handleChainEnd(
    outputs: ChainValues,
    runId: string,
  ): Promise<void> {
    const trace = this.traces.get(runId);
    const startTime = this.startTimes.get(runId);

    if (trace && startTime) {
      const latency = Date.now() - startTime;

      // 计算令牌使用量(简化版 - 生产环境中从LLM响应获取)
      const tokenUsage = {
        input: JSON.stringify(trace.input).length / 4, // 粗略估算
        output: JSON.stringify(outputs).length / 4,
        total: 0,
      };
      tokenUsage.total = tokenUsage.input + tokenUsage.output;

      // 计算成本(基于Gemini Pro定价)
      const cost = (tokenUsage.input * 0.00025 + tokenUsage.output * 0.0005) / 1000;

      const metrics: EvaluationMetric = {
        accuracy: 0, // 将由评估器计算
        relevance: 0,
        coherence: 0,
        latency,
        tokenUsage,
        cost,
        timestamp: new Date().toISOString(),
      };

      trace.output = outputs;
      trace.metrics = metrics;

      // 发送到监控服务
      await this.sendToMonitoring(trace as AgentTrace);
    }

    this.traces.delete(runId);
    this.startTimes.delete(runId);
  }

  async handleChainError(
    err: Error,
    runId: string,
  ): Promise<void> {
    const trace = this.traces.get(runId);
    if (trace) {
      trace.errors = [...(trace.errors || []), err.message];
      await this.sendToMonitoring(trace as AgentTrace);
    }
  }

  private async sendToMonitoring(trace: AgentTrace): Promise<void> {
    // 在生产环境中,发送到监控服务
    console.log('Trace:', JSON.stringify(trace, null, 2));

    // 示例:发送到Langfuse、DataDog或自定义端点
    if (process.env.MONITORING_ENDPOINT) {
      await fetch(process.env.MONITORING_ENDPOINT, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(trace),
      });
    }
  }
}

自定义回调处理器从智能体执行中捕获详细的指标。

3. 实现LLM评判器

// app/lib/evaluation/evaluator.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { PromptTemplate } from '@langchain/core/prompts';
import { z } from 'zod';
import { StructuredOutputParser } from 'langchain/output_parsers';
import { EvaluationMetric } from './types';
import { memoize } from 'es-toolkit';

const EvaluationResultSchema = z.object({
  accuracy: z.number().min(0).max(1),
  relevance: z.number().min(0).max(1),
  coherence: z.number().min(0).max(1),
  reasoning: z.string(),
});

export class LLMEvaluator {
  private model: ChatGoogleGenerativeAI;
  private parser: StructuredOutputParser<z.infer<typeof EvaluationResultSchema>>;

  constructor() {
    this.model = new ChatGoogleGenerativeAI({
      modelName: 'gemini-2.5-pro',
      temperature: 0,
      apiKey: process.env.GOOGLE_API_KEY,
    });

    this.parser = StructuredOutputParser.fromZodSchema(EvaluationResultSchema);
  }

  // 为相同输入的评估结果进行记忆化以减少成本
  evaluate = memoize(
    async (input: string, output: string, expectedOutput?: string) => {
      const formatInstructions = this.parser.getFormatInstructions();

      const prompt = PromptTemplate.fromTemplate(`
        评估以下智能体响应:

        输入:{input}
        智能体输出:{output}
        {expectedOutput}

        基于以下标准评估:
        1. 准确性:响应在事实上有多准确?(0-1)
        2. 相关性:它对输入的响应有多好?(0-1)
        3. 连贯性:它有多清晰和结构化?(0-1)

        {formatInstructions}
      `);

      const response = await this.model.invoke(
        await prompt.format({
          input,
          output,
          expectedOutput: expectedOutput
            ? `预期输出:${expectedOutput}`
            : '',
          formatInstructions,
        })
      );

      return this.parser.parse(response.content as string);
    },
    {
      // 基于输入哈希的缓存键
      getCacheKey: (args) => JSON.stringify(args),
    }
  );

  async evaluateWithMetrics(
    input: string,
    output: string,
    metrics: Partial<EvaluationMetric>,
    expectedOutput?: string
  ): Promise<EvaluationMetric> {
    const evaluation = await this.evaluate(input, output, expectedOutput);

    return {
      ...metrics,
      accuracy: evaluation.accuracy,
      relevance: evaluation.relevance,
      coherence: evaluation.coherence,
    } as EvaluationMetric;
  }
}

基于LLM的评估器自动评估响应质量。

4. 创建受监控的智能体

// app/lib/agents/monitored-agent.ts
import { StateGraph, Annotation } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { HumanMessage, AIMessage } from '@langchain/core/messages';
import { MonitoringCallbackHandler } from '../monitoring/callback';
import { LLMEvaluator } from '../evaluation/evaluator';
import { MemorySaver } from '@langchain/langgraph';

const StateAnnotation = Annotation.Root({
  messages: Annotation<(HumanMessage | AIMessage)[]>({
    reducer: (curr, next) => [...curr, ...next],
    default: () => [],
  }),
  evaluationResults: Annotation<any[]>({
    reducer: (curr, next) => [...curr, ...next],
    default: () => [],
  }),
});

export async function createMonitoredAgent() {
  const model = new ChatGoogleGenerativeAI({
    modelName: 'gemini-2.5-pro',
    temperature: 0.7,
    apiKey: process.env.GOOGLE_API_KEY,
  });

  const monitoringHandler = new MonitoringCallbackHandler();
  const evaluator = new LLMEvaluator();
  const memory = new MemorySaver();

  const workflow = new StateGraph(StateAnnotation)
    .addNode('process', async (state) => {
      const lastMessage = state.messages[state.messages.length - 1];

      const response = await model.invoke(
        state.messages,
        { callbacks: [monitoringHandler] }
      );

      // 自动评估响应
      const evaluation = await evaluator.evaluateWithMetrics(
        lastMessage.content as string,
        response.content as string,
        {
          latency: 0, // 将由回调设置
          tokenUsage: { input: 0, output: 0, total: 0 },
          cost: 0,
          timestamp: new Date().toISOString(),
        }
      );

      return {
        messages: [response],
        evaluationResults: [evaluation],
      };
    })
    .addEdge('__start__', 'process')
    .addEdge('process', '__end__');

  return workflow.compile({
    checkpointer: memory,
  });
}

// 使用函数
export async function handleAgentRequest(input: string, sessionId: string) {
  const agent = await createMonitoredAgent();

  const result = await agent.invoke(
    {
      messages: [new HumanMessage(input)],
    },
    {
      configurable: { thread_id: sessionId },
    }
  );

  // 访问评估结果
  const latestEvaluation = result.evaluationResults[result.evaluationResults.length - 1];

  return {
    response: result.messages[result.messages.length - 1].content,
    evaluation: latestEvaluation,
  };
}

具有集成监控和评估功能的智能体。

5. 创建API端点

// app/api/agent/monitored/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { handleAgentRequest } from '@/lib/agents/monitored-agent';
import { z } from 'zod';

const RequestSchema = z.object({
  message: z.string().min(1),
  sessionId: z.string().default(() => `session-${Date.now()}`),
});

export async function POST(request: NextRequest) {
  try {
    const body = await request.json();
    const { message, sessionId } = RequestSchema.parse(body);

    const result = await handleAgentRequest(message, sessionId);

    // 记录监控指标
    console.log('评估指标:', {
      accuracy: result.evaluation.accuracy,
      relevance: result.evaluation.relevance,
      coherence: result.evaluation.coherence,
      cost: result.evaluation.cost,
    });

    return NextResponse.json(result);
  } catch (error) {
    console.error('智能体错误:', error);
    return NextResponse.json(
      { error: '处理请求失败' },
      { status: 500 }
    );
  }
}

带有内置指标记录的API端点。

高级示例:生产监控系统

现在让我们构建一个具有分布式追踪、语义缓存和实时仪表板的全面监控系统。

1. 实现分布式追踪系统

// app/lib/tracing/distributed-tracer.ts
import { trace, context, SpanStatusCode, SpanKind } from '@opentelemetry/api';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { v4 as uuidv4 } from 'uuid';
import { memoize, debounce } from 'es-toolkit';

export class DistributedTracer {
  private tracer;
  private provider: NodeTracerProvider;

  constructor(serviceName: string = 'agent-system') {
    // 初始化提供者
    this.provider = new NodeTracerProvider({
      resource: new Resource({
        [SemanticResourceAttributes.SERVICE_NAME]: serviceName,
        [SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
      }),
    });

    // 配置导出器(生产环境中使用你的OTLP端点)
    const exporter = new OTLPTraceExporter({
      url: process.env.OTLP_ENDPOINT || 'http://localhost:4318/v1/traces',
    });

    this.provider.addSpanProcessor(new BatchSpanProcessor(exporter));
    this.provider.register();

    this.tracer = trace.getTracer('agent-tracer', '1.0.0');
  }

  traceAgent(agentName: string, operation: string) {
    return this.tracer.startSpan(`${agentName}.${operation}`, {
      kind: SpanKind.INTERNAL,
      attributes: {
        'agent.name': agentName,
        'agent.operation': operation,
        'agent.trace_id': uuidv4(),
      },
    });
  }

  traceMultiAgent(
    agents: string[],
    parentSpan?: any
  ) {
    const ctx = parentSpan
      ? trace.setSpan(context.active(), parentSpan)
      : context.active();

    return agents.map(agent =>
      this.tracer.startSpan(`multi-agent.${agent}`, {
        kind: SpanKind.INTERNAL,
        attributes: {
          'agent.name': agent,
          'agent.type': 'multi-agent',
        },
      }, ctx)
    );
  }

  // 高频操作的防抖指标聚合
  recordMetrics = debounce(
    (span: any, metrics: Record<string, any>) => {
      Object.entries(metrics).forEach(([key, value]) => {
        span.setAttribute(`metric.${key}`, value);
      });
    },
    100 // 每100毫秒聚合一次指标
  );

  endSpan(span: any, status: 'success' | 'error' = 'success', error?: Error) {
    if (status === 'error' && error) {
      span.recordException(error);
      span.setStatus({
        code: SpanStatusCode.ERROR,
        message: error.message,
      });
    } else {
      span.setStatus({ code: SpanStatusCode.OK });
    }
    span.end();
  }
}

// 单例实例
export const tracer = new DistributedTracer();

用于复杂多智能体工作流的分布式追踪。

2. 构建语义缓存层

// app/lib/cache/semantic-cache.ts
import { Redis } from '@upstash/redis';
import { GoogleGenerativeAIEmbeddings } from '@langchain/google-genai';
import { cosineSimilarity } from 'es-toolkit/compat';
import { LRUCache } from 'lru-cache';
import { z } from 'zod';

const CacheEntrySchema = z.object({
  key: z.string(),
  embedding: z.array(z.number()),
  response: z.string(),
  metadata: z.object({
    timestamp: z.string(),
    hitCount: z.number(),
    cost: z.number(),
  }),
});

type CacheEntry = z.infer<typeof CacheEntrySchema>;

export class SemanticCache {
  private redis: Redis;
  private embeddings: GoogleGenerativeAIEmbeddings;
  private localCache: LRUCache<string, CacheEntry>;
  private similarityThreshold = 0.95;

  constructor() {
    this.redis = new Redis({
      url: process.env.UPSTASH_REDIS_URL!,
      token: process.env.UPSTASH_REDIS_TOKEN!,
    });

    this.embeddings = new GoogleGenerativeAIEmbeddings({
      modelName: 'embedding-001',
      apiKey: process.env.GOOGLE_API_KEY,
    });

    // 热数据的本地LRU缓存
    this.localCache = new LRUCache<string, CacheEntry>({
      max: 100,
      ttl: 1000 * 60 * 5, // 5分钟
    });
  }

  async get(query: string): Promise<string | null> {
    // 首先检查本地缓存
    const localHit = this.checkLocalCache(query);
    if (localHit) {
      console.log('本地缓存命中');
      return localHit;
    }

    // 生成查询的嵌入
    const queryEmbedding = await this.embeddings.embedQuery(query);

    // 在Redis中搜索
    const cacheKeys = await this.redis.keys('cache:*');

    for (const key of cacheKeys) {
      const entry = await this.redis.get<CacheEntry>(key);
      if (!entry) continue;

      const similarity = cosineSimilarity(queryEmbedding, entry.embedding);

      if (similarity >= this.similarityThreshold) {
        console.log(`语义缓存命中(相似度:${similarity})`);

        // 更新命中计数
        entry.metadata.hitCount++;
        await this.redis.set(key, entry);

        // 添加到本地缓存
        this.localCache.set(query, entry);

        return entry.response;
      }
    }

    return null;
  }

  async set(query: string, response: string, cost: number = 0): Promise<void> {
    const embedding = await this.embeddings.embedQuery(query);

    const entry: CacheEntry = {
      key: query,
      embedding,
      response,
      metadata: {
        timestamp: new Date().toISOString(),
        hitCount: 0,
        cost,
      },
    };

    // 带过期时间存储到Redis
    const key = `cache:${Date.now()}`;
    await this.redis.set(key, entry, {
      ex: 60 * 60 * 24, // 24小时
    });

    // 也存储到本地缓存
    this.localCache.set(query, entry);
  }

  private checkLocalCache(query: string): string | null {
    const entry = this.localCache.get(query);
    return entry?.response || null;
  }

  async getCacheStats(): Promise<{
    totalEntries: number;
    totalHits: number;
    costSaved: number;
  }> {
    const keys = await this.redis.keys('cache:*');
    let totalHits = 0;
    let costSaved = 0;

    for (const key of keys) {
      const entry = await this.redis.get<CacheEntry>(key);
      if (entry) {
        totalHits += entry.metadata.hitCount;
        costSaved += entry.metadata.cost * entry.metadata.hitCount;
      }
    }

    return {
      totalEntries: keys.length,
      totalHits,
      costSaved,
    };
  }
}

语义缓存显著减少成本和延迟。

3. 创建实时监控仪表板

// app/lib/monitoring/dashboard.ts
import { EventEmitter } from 'events';
import { groupBy, meanBy, sumBy, maxBy } from 'es-toolkit';
import { AgentTrace } from '../evaluation/types';

interface DashboardMetrics {
  avgLatency: number;
  avgAccuracy: number;
  totalCost: number;
  errorRate: number;
  throughput: number;
  activeAgents: number;
}

export class MonitoringDashboard extends EventEmitter {
  private traces: AgentTrace[] = [];
  private metricsWindow = 60000; // 1分钟窗口
  private updateInterval: NodeJS.Timeout;

  constructor() {
    super();

    // 每5秒更新一次指标
    this.updateInterval = setInterval(() => {
      this.calculateMetrics();
    }, 5000);
  }

  addTrace(trace: AgentTrace) {
    this.traces.push(trace);

    // 只保留最近的追踪
    const cutoff = Date.now() - this.metricsWindow;
    this.traces = this.traces.filter(t =>
      new Date(t.metrics.timestamp).getTime() > cutoff
    );

    // 发出实时更新
    this.emit('trace', trace);
  }

  private calculateMetrics() {
    if (this.traces.length === 0) return;

    const metrics: DashboardMetrics = {
      avgLatency: meanBy(this.traces, t => t.metrics.latency),
      avgAccuracy: meanBy(this.traces, t => t.metrics.accuracy),
      totalCost: sumBy(this.traces, t => t.metrics.cost),
      errorRate: this.traces.filter(t => t.errors.length > 0).length / this.traces.length,
      throughput: this.traces.length / (this.metricsWindow / 1000),
      activeAgents: new Set(this.traces.map(t => t.agentName)).size,
    };

    this.emit('metrics', metrics);
  }

  getAggregatedMetrics(groupByField: 'agentName' | 'hour' = 'agentName') {
    if (groupByField === 'hour') {
      const grouped = groupBy(this.traces, t =>
        new Date(t.metrics.timestamp).getHours().toString()
      );

      return Object.entries(grouped).map(([hour, traces]) => ({
        hour,
        avgLatency: meanBy(traces, t => t.metrics.latency),
        totalRequests: traces.length,
        errorRate: traces.filter(t => t.errors.length > 0).length / traces.length,
      }));
    }

    const grouped = groupBy(this.traces, 'agentName');

    return Object.entries(grouped).map(([agent, traces]) => ({
      agent,
      avgLatency: meanBy(traces, t => t.metrics.latency),
      avgAccuracy: meanBy(traces, t => t.metrics.accuracy),
      totalCost: sumBy(traces, t => t.metrics.cost),
      requestCount: traces.length,
    }));
  }

  getAlerts(thresholds: {
    maxLatency?: number;
    minAccuracy?: number;
    maxErrorRate?: number;
  }) {
    const alerts = [];

    const avgLatency = meanBy(this.traces, t => t.metrics.latency);
    if (thresholds.maxLatency && avgLatency > thresholds.maxLatency) {
      alerts.push({
        type: 'latency',
        severity: 'warning',
        message: `平均延迟(${avgLatency}毫秒)超过阈值(${thresholds.maxLatency}毫秒)`,
      });
    }

    const avgAccuracy = meanBy(this.traces, t => t.metrics.accuracy);
    if (thresholds.minAccuracy && avgAccuracy < thresholds.minAccuracy) {
      alerts.push({
        type: 'accuracy',
        severity: 'critical',
        message: `平均准确率(${avgAccuracy})低于阈值(${thresholds.minAccuracy})`,
      });
    }

    const errorRate = this.traces.filter(t => t.errors.length > 0).length / this.traces.length;
    if (thresholds.maxErrorRate && errorRate > thresholds.maxErrorRate) {
      alerts.push({
        type: 'errors',
        severity: 'critical',
        message: `错误率(${errorRate * 100}%)超过阈值(${thresholds.maxErrorRate * 100}%)`,
      });
    }

    return alerts;
  }

  destroy() {
    clearInterval(this.updateInterval);
  }
}

// 全局仪表板实例
export const dashboard = new MonitoringDashboard();

实时仪表板聚合和监控智能体指标。

4. 实现A/B测试框架

// app/lib/testing/ab-testing.ts
import { StateGraph, Annotation } from '@langchain/langgraph';
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { sample, shuffle } from 'es-toolkit';
import { z } from 'zod';

const ABTestConfigSchema = z.object({
  testId: z.string(),
  variants: z.array(z.object({
    id: z.string(),
    model: z.string().optional(),
    temperature: z.number().optional(),
    systemPrompt: z.string().optional(),
    weight: z.number().default(1),
  })),
  metrics: z.array(z.enum(['accuracy', 'latency', 'cost', 'user_satisfaction'])),
  minSampleSize: z.number().default(100),
});

type ABTestConfig = z.infer<typeof ABTestConfigSchema>;

const TestResultSchema = z.object({
  variantId: z.string(),
  metrics: z.record(z.number()),
  sampleSize: z.number(),
  confidence: z.number(),
});

type TestResult = z.infer<typeof TestResultSchema>;

export class ABTestingFramework {
  private activeTests = new Map<string, ABTestConfig>();
  private results = new Map<string, Map<string, TestResult>>();

  createTest(config: ABTestConfig) {
    this.activeTests.set(config.testId, config);
    this.results.set(config.testId, new Map());

    // 初始化每个变体的结果
    config.variants.forEach(variant => {
      this.results.get(config.testId)!.set(variant.id, {
        variantId: variant.id,
        metrics: {},
        sampleSize: 0,
        confidence: 0,
      });
    });
  }

  selectVariant(testId: string): string | null {
    const test = this.activeTests.get(testId);
    if (!test) return null;

    // 加权随机选择
    const weights = test.variants.map(v => v.weight);
    const totalWeight = weights.reduce((a, b) => a + b, 0);

    let random = Math.random() * totalWeight;
    for (let i = 0; i < test.variants.length; i++) {
      random -= weights[i];
      if (random <= 0) {
        return test.variants[i].id;
      }
    }

    return test.variants[0].id;
  }

  async runVariant(testId: string, variantId: string, input: string) {
    const test = this.activeTests.get(testId);
    const variant = test?.variants.find(v => v.id === variantId);

    if (!variant) throw new Error('未找到变体');

    // 使用变体配置创建模型
    const model = new ChatGoogleGenerativeAI({
      modelName: variant.model || 'gemini-2.5-pro',
      temperature: variant.temperature ?? 0.7,
      apiKey: process.env.GOOGLE_API_KEY,
    });

    const startTime = Date.now();

    const messages = variant.systemPrompt
      ? [
          { role: 'system', content: variant.systemPrompt },
          { role: 'user', content: input },
        ]
      : [{ role: 'user', content: input }];

    const response = await model.invoke(messages);

    const latency = Date.now() - startTime;

    return {
      response: response.content,
      metrics: {
        latency,
        // 其他指标将基于评估计算
      },
    };
  }

  recordResult(
    testId: string,
    variantId: string,
    metrics: Record<string, number>
  ) {
    const results = this.results.get(testId);
    const variantResult = results?.get(variantId);

    if (!variantResult) return;

    // 更新运行平均值
    const n = variantResult.sampleSize;
    Object.entries(metrics).forEach(([key, value]) => {
      const current = variantResult.metrics[key] || 0;
      variantResult.metrics[key] = (current * n + value) / (n + 1);
    });

    variantResult.sampleSize++;

    // 计算置信度(简化版 - 生产环境中使用适当的统计)
    variantResult.confidence = Math.min(
      variantResult.sampleSize / 100,
      0.95
    );
  }

  getResults(testId: string): TestResult[] {
    const results = this.results.get(testId);
    if (!results) return [];

    return Array.from(results.values());
  }

  determineWinner(testId: string): string | null {
    const results = this.getResults(testId);
    const test = this.activeTests.get(testId);

    if (!test || results.length === 0) return null;

    // 检查所有变体是否都有最小样本量
    const allHaveMinSamples = results.every(
      r => r.sampleSize >= test.minSampleSize
    );

    if (!allHaveMinSamples) return null;

    // 基于主要指标(列表中的第一个)查找获胜者
    const primaryMetric = test.metrics[0];

    const winner = results.reduce((best, current) => {
      const bestScore = best.metrics[primaryMetric] || 0;
      const currentScore = current.metrics[primaryMetric] || 0;

      // 对于延迟和成本,越低越好
      if (primaryMetric === 'latency' || primaryMetric === 'cost') {
        return currentScore < bestScore ? current : best;
      }

      return currentScore > bestScore ? current : best;
    });

    // 检查置信度是否足够高
    if (winner.confidence >= 0.95) {
      return winner.variantId;
    }

    return null;
  }
}

用于比较智能体配置的A/B测试框架。

5. 创建生产监控API

// app/api/monitoring/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { dashboard } from '@/lib/monitoring/dashboard';
import { SemanticCache } from '@/lib/cache/semantic-cache';
import { ABTestingFramework } from '@/lib/testing/ab-testing';
import { tracer } from '@/lib/tracing/distributed-tracer';

const cache = new SemanticCache();
const abTesting = new ABTestingFramework();

// 初始化A/B测试
abTesting.createTest({
  testId: 'model-comparison',
  variants: [
    { id: 'gpt4', model: 'gpt-4-turbo-preview', weight: 1 },
    { id: 'gpt35', model: 'gpt-3.5-turbo', weight: 1 },
  ],
  metrics: ['accuracy', 'latency', 'cost'],
  minSampleSize: 50,
});

export async function GET(request: NextRequest) {
  const { searchParams } = new URL(request.url);
  const view = searchParams.get('view') || 'metrics';

  switch (view) {
    case 'metrics':
      const metrics = dashboard.getAggregatedMetrics('agentName');
      return NextResponse.json({ metrics });

    case 'alerts':
      const alerts = dashboard.getAlerts({
        maxLatency: 5000,
        minAccuracy: 0.8,
        maxErrorRate: 0.1,
      });
      return NextResponse.json({ alerts });

    case 'cache':
      const cacheStats = await cache.getCacheStats();
      return NextResponse.json({ cacheStats });

    case 'ab-test':
      const results = abTesting.getResults('model-comparison');
      const winner = abTesting.determineWinner('model-comparison');
      return NextResponse.json({ results, winner });

    default:
      return NextResponse.json({ error: '无效视图' }, { status: 400 });
  }
}

export async function POST(request: NextRequest) {
  const span = tracer.traceAgent('monitoring-api', 'process-request');

  try {
    const { message, testId } = await request.json();

    // 首先检查缓存
    const cached = await cache.get(message);
    if (cached) {
      tracer.recordMetrics(span, { cache_hit: 1 });
      tracer.endSpan(span, 'success');

      return NextResponse.json({
        response: cached,
        source: 'cache',
      });
    }

    // 选择A/B测试变体
    const variantId = abTesting.selectVariant(testId || 'model-comparison');

    if (variantId) {
      const result = await abTesting.runVariant(
        testId || 'model-comparison',
        variantId,
        message
      );

      // 记录指标
      abTesting.recordResult(
        testId || 'model-comparison',
        variantId,
        result.metrics
      );

      // 缓存响应
      await cache.set(message, result.response as string);

      tracer.recordMetrics(span, {
        variant: variantId,
        latency: result.metrics.latency,
      });
      tracer.endSpan(span, 'success');

      return NextResponse.json({
        response: result.response,
        variant: variantId,
        metrics: result.metrics,
      });
    }

    tracer.endSpan(span, 'error', new Error('未选择变体'));
    return NextResponse.json(
      { error: '测试配置错误' },
      { status: 500 }
    );

  } catch (error) {
    tracer.endSpan(span, 'error', error as Error);
    return NextResponse.json(
      { error: '处理失败' },
      { status: 500 }
    );
  }
}

具有缓存、A/B测试和分布式追踪的生产API。

6. 创建前端仪表板组件

// app/components/monitoring-dashboard.tsx
'use client';

import { useQuery } from '@tanstack/react-query';
import { useState } from 'react';
import { Line, Bar } from 'recharts';

interface DashboardData {
  metrics?: any[];
  alerts?: any[];
  cacheStats?: any;
  results?: any[];
  winner?: string;
}

export default function MonitoringDashboard() {
  const [view, setView] = useState<'metrics' | 'alerts' | 'cache' | 'ab-test'>('metrics');

  const { data, isLoading } = useQuery<DashboardData>({
    queryKey: ['monitoring', view],
    queryFn: async () => {
      const response = await fetch(`/api/monitoring?view=${view}`);
      return response.json();
    },
    refetchInterval: 5000, // 每5秒刷新
  });

  if (isLoading) return <div className="loading loading-spinner" />;

  return (
    <div className="p-6">
      <div className="tabs tabs-boxed mb-4">
        <button
          className={`tab ${view === 'metrics' ? 'tab-active' : ''}`}
          onClick={() => setView('metrics')}
        >
          指标
        </button>
        <button
          className={`tab ${view === 'alerts' ? 'tab-active' : ''}`}
          onClick={() => setView('alerts')}
        >
          警报
        </button>
        <button
          className={`tab ${view === 'cache' ? 'tab-active' : ''}`}
          onClick={() => setView('cache')}
        >
          缓存
        </button>
        <button
          className={`tab ${view === 'ab-test' ? 'tab-active' : ''}`}
          onClick={() => setView('ab-test')}
        >
          A/B测试
        </button>
      </div>

      {view === 'metrics' && data?.metrics && (
        <div className="grid grid-cols-2 gap-4">
          {data.metrics.map((metric: any) => (
            <div key={metric.agent} className="card bg-base-200 p-4">
              <h3 className="text-lg font-bold">{metric.agent}</h3>
              <div className="stats stats-vertical">
                <div className="stat">
                  <div className="stat-title">平均延迟</div>
                  <div className="stat-value text-2xl">{metric.avgLatency.toFixed(0)}毫秒</div>
                </div>
                <div className="stat">
                  <div className="stat-title">准确率</div>
                  <div className="stat-value text-2xl">{(metric.avgAccuracy * 100).toFixed(1)}%</div>
                </div>
                <div className="stat">
                  <div className="stat-title">总成本</div>
                  <div className="stat-value text-2xl">${metric.totalCost.toFixed(2)}</div>
                </div>
              </div>
            </div>
          ))}
        </div>
      )}

      {view === 'alerts' && data?.alerts && (
        <div className="space-y-2">
          {data.alerts.length === 0 ? (
            <div className="alert alert-success">
              <span>所有系统运行正常</span>
            </div>
          ) : (
            data.alerts.map((alert: any, idx: number) => (
              <div key={idx} className={`alert alert-${alert.severity === 'critical' ? 'error' : 'warning'}`}>
                <span>{alert.message}</span>
              </div>
            ))
          )}
        </div>
      )}

      {view === 'cache' && data?.cacheStats && (
        <div className="stats shadow">
          <div className="stat">
            <div className="stat-title">缓存条目</div>
            <div className="stat-value">{data.cacheStats.totalEntries}</div>
          </div>
          <div className="stat">
            <div className="stat-title">总命中次数</div>
            <div className="stat-value">{data.cacheStats.totalHits}</div>
          </div>
          <div className="stat">
            <div className="stat-title">节省成本</div>
            <div className="stat-value">${data.cacheStats.costSaved.toFixed(2)}</div>
          </div>
        </div>
      )}

      {view === 'ab-test' && data?.results && (
        <div className="space-y-4">
          <div className="grid grid-cols-2 gap-4">
            {data.results.map((result: any) => (
              <div key={result.variantId} className="card bg-base-200 p-4">
                <h3 className="text-lg font-bold">变体:{result.variantId}</h3>
                <div className="stats stats-vertical">
                  <div className="stat">
                    <div className="stat-title">样本量</div>
                    <div className="stat-value">{result.sampleSize}</div>
                  </div>
                  <div className="stat">
                    <div className="stat-title">置信度</div>
                    <div className="stat-value">{(result.confidence * 100).toFixed(1)}%</div>
                  </div>
                </div>
              </div>
            ))}
          </div>
          {data.winner && (
            <div className="alert alert-success">
              <span>确定获胜者:{data.winner}</span>
            </div>
          )}
        </div>
      )}
    </div>
  );
}

用于监控智能体性能的实时仪表板组件。

结论

我们构建了一个全面的评估和监控系统,可以处理非确定性智能体行为的复杂性。基础示例展示了带有LLM评估的基本监控,而高级示例展示了包括分布式追踪、语义缓存、A/B测试和实时仪表板在内的生产就绪模式。这些模式使您能够自信地部署可靠的智能体系统,保持对其行为的可见性,并根据真实数据持续优化性能。