在 MCP 服务器中掌握 Zod 验证与 vercel/mcp-handler

mcpzodtypescriptvalidationai
By sko X opus 4.19/19/202523 min read

Zod 是 MCP 服务器中类型安全验证的核心。在使用 vercel/mcp-handler 构建时,Zod 模式定义了 AI 模型与您的工具之间的契约,确保数据完整性并提供丰富的 TypeScript 类型推断。本指南涵盖了您需要的每个 Zod 功能,并提供了实用的 MCP 特定示例。

MCP 服务器的核心 Zod 概念

将 MCP 中的 Zod 模式视为 AI 的 API 契约。您公开的每个工具都需要清晰的输入验证——就像 REST 端点需要请求验证一样,但增加了 AI 模型动态生成输入的复杂性。

// 传统 API:人类开发者阅读文档并发送正确数据
// MCP 工具:AI 模型读取模式并生成匹配数据
// Zod 确保契约的两端都得到执行

基本类型验证模式

带业务规则的字符串验证

import { createMcpHandler } from "mcp-handler";
import { z } from "zod";

const handler = createMcpHandler((server) => {
  server.tool(
    "user_registration",
    "Register a new user with validated inputs",
    {
      // 带自定义域名验证的邮箱
      email: z
        .string()
        .email("Invalid email format")
        .refine(
          (email) => email.endsWith("@company.com") || email.endsWith("@partner.org"),
          "Email must be from company.com or partner.org domain"
        ),

      // 带复杂要求的用户名
      username: z
        .string()
        .min(3, "Username must be at least 3 characters")
        .max(20, "Username cannot exceed 20 characters")
        .regex(/^[a-zA-Z0-9_-]+$/, "Username can only contain letters, numbers, underscore, and hyphen")
        .transform((val) => val.toLowerCase()), // Always lowercase

      // 带安全要求的密码
      password: z
        .string()
        .min(8, "Password must be at least 8 characters")
        .regex(/[A-Z]/, "Password must contain at least one uppercase letter")
        .regex(/[a-z]/, "Password must contain at least one lowercase letter")
        .regex(/[0-9]/, "Password must contain at least one number")
        .regex(/[^A-Za-z0-9]/, "Password must contain at least one special character"),

      // 带长度限制的可选简介
      bio: z
        .string()
        .max(500, "Bio cannot exceed 500 characters")
        .optional()
        .transform((val) => val?.trim()), // Trim whitespace if provided
    },
    async ({ email, username, password, bio }) => {
      // 所有输入都已验证和类型化
      const user = await createUser({ email, username, password, bio });
      return {
        content: [{
          type: "text",
          text: `User ${username} created successfully with ID: ${user.id}`
        }],
      };
    }
  );
});

带范围的数字验证

server.tool(
  "financial_transaction",
  "Process financial transactions with strict validation",
  {
    amount: z
      .number()
      .positive("Amount must be positive")
      .multipleOf(0.01, "Amount must be to 2 decimal places")
      .max(1000000, "Single transaction limit exceeded"),

    quantity: z
      .number()
      .int("Quantity must be a whole number")
      .min(1, "Quantity must be at least 1")
      .max(100, "Maximum quantity is 100"),

    percentage: z
      .number()
      .min(0, "Percentage cannot be negative")
      .max(100, "Percentage cannot exceed 100")
      .transform((val) => val / 100), // Convert to decimal

    temperature: z
      .number()
      .finite("Temperature must be a finite number")
      .refine(
        (val) => val >= -273.15,
        "Temperature cannot be below absolute zero"
      ),
  },
  async (params) => {
    // 处理已验证的交易
    return { content: [{ type: "text", text: "Transaction processed" }] };
  }
);

布尔值和日期验证

server.tool(
  "schedule_meeting",
  "Schedule a meeting with date/time validation",
  {
    title: z.string().min(1, "Title is required"),

    // 带业务逻辑的日期验证
    startDate: z
      .string()
      .datetime({ message: "Invalid datetime format" })
      .refine((date) => {
        const d = new Date(date);
        return d > new Date();
      }, "Start date must be in the future")
      .transform((str) => new Date(str)),

    // 分钟为单位的持续时间
    duration: z
      .number()
      .int()
      .min(15, "Minimum meeting duration is 15 minutes")
      .max(480, "Maximum meeting duration is 8 hours"),

    // 带默认值的布尔标志
    isRecurring: z.boolean().default(false),
    sendReminders: z.boolean().default(true),
    requiresApproval: z.boolean().optional(),

    // 将字符串强制转换为布尔值(适用于表单数据)
    isPublic: z
      .string()
      .transform((val) => val === "true" || val === "1")
      .or(z.boolean())
      .default(false),
  },
  async (meeting) => {
    // meeting.startDate 现在是 Date 对象
    const scheduled = await scheduleMeeting(meeting);
    return {
      content: [{
        type: "text",
        text: `Meeting scheduled for ${meeting.startDate.toISOString()}`
      }],
    };
  }
);

高级对象模式

带验证的嵌套对象

server.tool(
  "create_project",
  "Create a project with complex nested structure",
  {
    project: z.object({
      name: z.string().min(1).max(100),
      description: z.string().optional(),

      // 设置的嵌套对象
      settings: z.object({
        visibility: z.enum(["public", "private", "team"]),
        autoArchive: z.boolean().default(false),
        maxMembers: z.number().int().min(1).max(1000).default(50),
      }),

      // 带验证依赖的嵌套对象
      budget: z.object({
        amount: z.number().positive(),
        currency: z.enum(["USD", "EUR", "GBP"]),
        allocated: z.number().default(0),
      }).refine(
        (budget) => budget.allocated <= budget.amount,
        "Allocated budget cannot exceed total amount"
      ),

      // 嵌套对象数组
      milestones: z.array(
        z.object({
          title: z.string(),
          dueDate: z.string().datetime(),
          status: z.enum(["pending", "in_progress", "completed"]).default("pending"),
        })
      ).min(1, "At least one milestone is required")
       .max(20, "Maximum 20 milestones allowed"),
    }),
  },
  async ({ project }) => {
    const created = await createProject(project);
    return {
      content: [{
        type: "text",
        text: `Project "${project.name}" created with ${project.milestones.length} milestones`
      }],
    };
  }
);

带记录的动态对象键

server.tool(
  "update_metadata",
  "Update metadata with dynamic key-value pairs",
  {
    entityId: z.string().uuid("Invalid entity ID format"),

    // 带字符串键和各种值类型的记录
    metadata: z.record(
      z.string().regex(/^[a-z_][a-z0-9_]*$/i, "Invalid metadata key format"),
      z.union([
        z.string(),
        z.number(),
        z.boolean(),
        z.null(),
      ])
    ),

    // 带枚举键的记录
    permissions: z.record(
      z.enum(["read", "write", "delete", "admin"]),
      z.boolean()
    ),

    // 复杂结构的嵌套记录
    translations: z.record(
      z.string().length(2, "Language code must be 2 characters"), // e.g., "en", "fr"
      z.object({
        title: z.string(),
        description: z.string().optional(),
      })
    ),
  },
  async ({ entityId, metadata, permissions, translations }) => {
    // 对动态键的类型安全访问
    const result = await updateEntity(entityId, { metadata, permissions, translations });
    return {
      content: [{
        type: "text",
        text: `Updated entity ${entityId} with ${Object.keys(metadata).length} metadata fields`
      }],
    };
  }
);

数组和元组

带约束的数组验证

server.tool(
  "batch_process",
  "Process batch operations with array validation",
  {
    // 带长度约束的简单数组
    ids: z
      .array(z.string().uuid())
      .min(1, "At least one ID required")
      .max(100, "Maximum 100 items per batch"),

    // 带唯一值的数组
    tags: z
      .array(z.string())
      .refine(
        (tags) => new Set(tags).size === tags.length,
        "Tags must be unique"
      ),

    // 带复杂验证的对象数组
    operations: z
      .array(
        z.object({
          type: z.enum(["create", "update", "delete"]),
          data: z.record(z.any()),
          priority: z.number().int().min(1).max(10).default(5),
        })
      )
      .transform((ops) =>
        // 按优先级排序
        ops.sort((a, b) => b.priority - a.priority)
      ),

    // 固定长度数组的元组
    coordinates: z.tuple([
      z.number().min(-90).max(90),  // latitude
      z.number().min(-180).max(180), // longitude
    ]),

    // 元组中的其余元素
    version: z.tuple([
      z.number().int(), // major
      z.number().int(), // minor
      z.number().int(), // patch
    ]).rest(z.string()), // 可以有额外的字符串元素
  },
  async ({ ids, tags, operations, coordinates, version }) => {
    // coordinates 类型为 [number, number]
    // version 类型为 [number, number, number, ...string[]]
    const results = await processBatch({ ids, tags, operations });
    return {
      content: [{
        type: "text",
        text: `Processed ${operations.length} operations at coordinates ${coordinates.join(", ")}`
      }],
    };
  }
);

枚举和判别联合

枚举模式

// 定义枚举值为常量以便重用
const TaskStatus = {
  PENDING: "pending",
  IN_PROGRESS: "in_progress",
  COMPLETED: "completed",
  CANCELLED: "cancelled",
} as const;

const TaskPriority = ["low", "medium", "high", "urgent"] as const;

server.tool(
  "task_management",
  "Manage tasks with enum validation",
  {
    action: z.enum(["create", "update", "delete", "archive"]),

    // 使用对象值作为枚举
    status: z.enum(Object.values(TaskStatus) as [string, ...string[]]),

    // 使用数组作为枚举
    priority: z.enum(TaskPriority),

    // 带默认值的可选枚举
    assigneeRole: z
      .enum(["developer", "designer", "manager", "qa"])
      .optional()
      .default("developer"),

    // 转换枚举值
    visibility: z
      .enum(["0", "1", "2"]) // 传入为字符串
      .transform((val) => {
        const map = { "0": "private", "1": "team", "2": "public" };
        return map[val as keyof typeof map];
      }),
  },
  async (params) => {
    const result = await manageTask(params);
    return {
      content: [{
        type: "text",
        text: `Task ${params.action}d with status: ${params.status}`
      }],
    };
  }
);

复杂类型的判别联合

server.tool(
  "process_payment",
  "Process different payment types with discriminated unions",
  {
    payment: z.discriminatedUnion("type", [
      z.object({
        type: z.literal("credit_card"),
        cardNumber: z.string().regex(/^\d{16}$/, "Card number must be 16 digits"),
        cvv: z.string().regex(/^\d{3,4}$/, "CVV must be 3-4 digits"),
        expiryMonth: z.number().int().min(1).max(12),
        expiryYear: z.number().int().min(new Date().getFullYear()),
      }),
      z.object({
        type: z.literal("bank_transfer"),
        accountNumber: z.string(),
        routingNumber: z.string(),
        accountType: z.enum(["checking", "savings"]),
      }),
      z.object({
        type: z.literal("crypto"),
        walletAddress: z.string().regex(/^0x[a-fA-F0-9]{40}$/, "Invalid wallet address"),
        network: z.enum(["ethereum", "polygon", "arbitrum"]),
        tokenAddress: z.string().optional(),
      }),
      z.object({
        type: z.literal("paypal"),
        email: z.string().email(),
        paypalId: z.string().optional(),
      }),
    ]),

    amount: z.number().positive(),
    currency: z.string().length(3, "Currency code must be 3 characters"),
  },
  async ({ payment, amount, currency }) => {
    // TypeScript 根据 payment.type 知道确切的形状
    if (payment.type === "credit_card") {
      // payment.cardNumber 在这里可用
      return await processCreditCard(payment, amount, currency);
    } else if (payment.type === "crypto") {
      // payment.walletAddress 在这里可用
      return await processCrypto(payment, amount, currency);
    }
    // ... 处理其他支付类型

    return {
      content: [{
        type: "text",
        text: `Payment of ${amount} ${currency} processed via ${payment.type}`
      }],
    };
  }
);

转换和预处理

数据转换管道

server.tool(
  "import_data",
  "Import and transform data with preprocessing",
  {
    // 将字符串强制转换为数字
    userId: z.string().transform((val) => parseInt(val, 10)),

    // 解析 JSON 字符串
    config: z
      .string()
      .transform((str) => {
        try {
          return JSON.parse(str);
        } catch {
          throw new Error("Invalid JSON configuration");
        }
      })
      .pipe(
        z.object({
          enabled: z.boolean(),
          threshold: z.number(),
        })
      ),

    // 清理和规范化数据
    phone: z
      .string()
      .transform((phone) => phone.replace(/\D/g, "")) // 移除非数字
      .refine((phone) => phone.length === 10, "Phone must be 10 digits")
      .transform((phone) => `+1${phone}`), // 添加国家代码

    // 解析逗号分隔值
    tags: z
      .string()
      .transform((str) => str.split(",").map(s => s.trim()))
      .pipe(z.array(z.string().min(1))),

    // 带验证的复杂转换
    dateRange: z
      .string()
      .transform((str) => {
        const [start, end] = str.split(" to ").map(d => new Date(d.trim()));
        return { start, end };
      })
      .refine(
        ({ start, end }) => start < end,
        "Start date must be before end date"
      ),
  },
  async (data) => {
    // 所有数据都已转换和验证
    const result = await importData(data);
    return {
      content: [{
        type: "text",
        text: `Imported data for user ${data.userId} with ${data.tags.length} tags`
      }],
    };
  }
);

用于规范化的预处理

server.tool(
  "search_products",
  "Search products with input preprocessing",
  {
    // 预处理以处理各种输入格式
    query: z.preprocess(
      (val) => {
        if (typeof val === "string") return val.trim().toLowerCase();
        if (typeof val === "number") return String(val);
        return val;
      },
      z.string().min(1, "Search query required")
    ),

    // 强制转换和验证价格范围
    minPrice: z.preprocess(
      (val) => (val === "" || val === null ? undefined : Number(val)),
      z.number().min(0).optional()
    ),

    maxPrice: z.preprocess(
      (val) => (val === "" || val === null ? undefined : Number(val)),
      z.number().positive().optional()
    ),

    // 处理类布尔值
    inStock: z.preprocess(
      (val) => {
        if (typeof val === "boolean") return val;
        if (val === "true" || val === "1" || val === 1) return true;
        if (val === "false" || val === "0" || val === 0) return false;
        return undefined;
      },
      z.boolean().optional()
    ),

    // 解析和验证日期字符串
    availableAfter: z.preprocess(
      (val) => {
        if (!val) return undefined;
        const date = new Date(String(val));
        return isNaN(date.getTime()) ? undefined : date;
      },
      z.date().optional()
    ),
  },
  async (params) => {
    const products = await searchProducts(params);
    return {
      content: [{
        type: "text",
        text: `Found ${products.length} products matching "${params.query}"`
      }],
    };
  }
);

细化和自定义验证

基本细化

server.tool(
  "create_event",
  "Create event with complex validation rules",
  {
    name: z.string(),
    startTime: z.string().datetime(),
    endTime: z.string().datetime(),
    maxAttendees: z.number().int().positive(),
    currentAttendees: z.number().int().min(0).default(0),

    // 对象级细化
    location: z.object({
      type: z.enum(["physical", "virtual", "hybrid"]),
      address: z.string().optional(),
      url: z.string().url().optional(),
      capacity: z.number().int().positive().optional(),
    }).refine(
      (loc) => {
        if (loc.type === "physical" && !loc.address) {
          return false;
        }
        if (loc.type === "virtual" && !loc.url) {
          return false;
        }
        return true;
      },
      {
        message: "Physical events need address, virtual events need URL",
      }
    ),
  },
  async (eventData) => {
    const event = await createEvent(eventData);
    return {
      content: [{
        type: "text",
        text: `Event "${eventData.name}" created successfully`
      }],
    };
  }
);

用于多重验证的 SuperRefine

server.tool(
  "configure_deployment",
  "Configure deployment with interdependent validations",
  {
    environment: z.enum(["development", "staging", "production"]),
    replicas: z.number().int().positive(),
    memory: z.number().positive(), // in GB
    cpu: z.number().positive(), // in cores
    autoScaling: z.boolean(),
    minReplicas: z.number().int().positive().optional(),
    maxReplicas: z.number().int().positive().optional(),
    customDomain: z.string().optional(),
    ssl: z.boolean().optional(),
  },
  async (config) => {
    // 输入针对所有 superRefine 规则进行验证
    const deployment = await configureDeployment(config);
    return {
      content: [{
        type: "text",
        text: `Deployment configured for ${config.environment} with ${config.replicas} replicas`
      }],
    };
  }
);

// 为复杂的多字段验证向模式添加 superRefine
const deploymentSchema = z
  .object({
    environment: z.enum(["development", "staging", "production"]),
    replicas: z.number().int().positive(),
    memory: z.number().positive(),
    cpu: z.number().positive(),
    autoScaling: z.boolean(),
    minReplicas: z.number().int().positive().optional(),
    maxReplicas: z.number().int().positive().optional(),
    customDomain: z.string().optional(),
    ssl: z.boolean().optional(),
  })
  .superRefine((data, ctx) => {
    // 生产特定验证
    if (data.environment === "production") {
      if (data.replicas < 2) {
        ctx.addIssue({
          code: z.ZodIssueCode.custom,
          message: "Production must have at least 2 replicas",
          path: ["replicas"],
        });
      }

      if (data.memory < 4) {
        ctx.addIssue({
          code: z.ZodIssueCode.too_small,
          message: "Production requires at least 4GB memory",
          path: ["memory"],
          minimum: 4,
          inclusive: true,
          type: "number",
        });
      }
    }

    // 自动扩展验证
    if (data.autoScaling) {
      if (!data.minReplicas || !data.maxReplicas) {
        ctx.addIssue({
          code: z.ZodIssueCode.custom,
          message: "Auto-scaling requires both minReplicas and maxReplicas",
        });
      }

      if (data.minReplicas && data.maxReplicas && data.minReplicas > data.maxReplicas) {
        ctx.addIssue({
          code: z.ZodIssueCode.custom,
          message: "minReplicas cannot be greater than maxReplicas",
          path: ["minReplicas"],
        });
      }
    }

    // 自定义域名需要 SSL
    if (data.customDomain && !data.ssl) {
      ctx.addIssue({
        code: z.ZodIssueCode.custom,
        message: "Custom domain requires SSL to be enabled",
        path: ["ssl"],
      });
    }

    // 资源比例验证
    const cpuMemoryRatio = data.cpu / data.memory;
    if (cpuMemoryRatio > 1) {
      ctx.addIssue({
        code: z.ZodIssueCode.custom,
        message: "CPU cores should not exceed memory (GB)",
      });
    }
  });

异步细化

server.tool(
  "register_domain",
  "Register domain with async validation",
  {
    domain: z
      .string()
      .regex(/^[a-z0-9-]+\.[a-z]{2,}$/, "Invalid domain format")
      .refine(async (domain) => {
        // 检查域名是否可用(异步操作)
        const available = await checkDomainAvailability(domain);
        return available;
      }, "Domain is not available"),

    owner: z.object({
      email: z
        .string()
        .email()
        .refine(async (email) => {
          // 验证邮箱不在黑名单中
          const blacklisted = await checkEmailBlacklist(email);
          return !blacklisted;
        }, "Email is blacklisted"),

      organizationId: z
        .string()
        .uuid()
        .refine(async (id) => {
          // 验证组织是否存在
          const exists = await organizationExists(id);
          return exists;
        }, "Organization not found"),
    }),

    duration: z.number().int().min(1).max(10), // years
  },
  async ({ domain, owner, duration }) => {
    const registration = await registerDomain({ domain, owner, duration });
    return {
      content: [{
        type: "text",
        text: `Domain ${domain} registered for ${duration} years`
      }],
    };
  }
);

联合类型和可选处理

灵活的输入类型

server.tool(
  "flexible_query",
  "Handle multiple input formats with unions",
  {
    // 字符串或数字 ID
    identifier: z.union([
      z.string().uuid(),
      z.number().int().positive(),
      z.string().regex(/^[A-Z]{2,4}-\d{6}$/), // 自定义格式
    ]),

    // 多种日期格式
    date: z.union([
      z.string().datetime(),
      z.string().regex(/^\d{4}-\d{2}-\d{2}$/), // YYYY-MM-DD
      z.number(), // Unix timestamp
    ]).transform((val) => {
      if (typeof val === "number") return new Date(val * 1000);
      return new Date(val);
    }),

    // 多类型可选
    filter: z
      .union([
        z.string(),
        z.array(z.string()),
        z.object({
          include: z.array(z.string()).optional(),
          exclude: z.array(z.string()).optional(),
        }),
      ])
      .optional()
      .transform((filter) => {
        // 规范化为一致格式
        if (!filter) return { include: [], exclude: [] };
        if (typeof filter === "string") return { include: [filter], exclude: [] };
        if (Array.isArray(filter)) return { include: filter, exclude: [] };
        return filter;
      }),

    // 可空与可选
    description: z.string().nullable(), // 可以为 null
    metadata: z.record(z.any()).optional(), // 可以为 undefined
    notes: z.string().nullish(), // 可以为 null 或 undefined
  },
  async (params) => {
    const result = await executeQuery(params);
    return {
      content: [{
        type: "text",
        text: `Query executed with identifier: ${params.identifier}`
      }],
    };
  }
);

错误处理和自定义消息

全面的错误消息

const userInputSchema = z.object({
  username: z
    .string({
      required_error: "Username is required",
      invalid_type_error: "Username must be a string",
    })
    .min(3, { message: "Username must be at least 3 characters long" })
    .max(20, { message: "Username cannot exceed 20 characters" })
    .regex(/^[a-zA-Z0-9_]+$/, {
      message: "Username can only contain letters, numbers, and underscores"
    }),

  age: z
    .number({
      required_error: "Age is required",
      invalid_type_error: "Age must be a number",
    })
    .int({ message: "Age must be a whole number" })
    .positive({ message: "Age must be positive" })
    .max(120, { message: "Age cannot exceed 120" }),

  email: z
    .string()
    .email({ message: "Please provide a valid email address" })
    .refine(
      (email) => !email.includes("tempmail"),
      { message: "Temporary email addresses are not allowed" }
    ),
});

server.tool(
  "validate_user",
  "Validate user input with detailed error messages",
  userInputSchema.shape,
  async (userData) => {
    // 如果到达这里,所有验证都通过了
    return {
      content: [{
        type: "text",
        text: `User ${userData.username} validated successfully`
      }],
    };
  }
);

错误路径规范

const complexSchema = z
  .object({
    user: z.object({
      profile: z.object({
        firstName: z.string(),
        lastName: z.string(),
        age: z.number(),
      }),
      settings: z.object({
        notifications: z.boolean(),
        theme: z.enum(["light", "dark"]),
      }),
    }),
    permissions: z.array(z.string()),
  })
  .superRefine((data, ctx) => {
    if (data.user.profile.age < 18 && data.permissions.includes("admin")) {
      ctx.addIssue({
        code: z.ZodIssueCode.custom,
        message: "Users under 18 cannot have admin permissions",
        path: ["permissions"], // 为错误指定确切路径
      });
    }

    if (data.user.profile.firstName === data.user.profile.lastName) {
      ctx.addIssue({
        code: z.ZodIssueCode.custom,
        message: "First name and last name must be different",
        path: ["user", "profile", "lastName"], // 嵌套路径
      });
    }
  });

性能优化模式

递归结构的懒惰模式

// 定义递归类型(例如树结构、嵌套评论)
type TreeNode = {
  id: string;
  value: any;
  children?: TreeNode[];
};

const treeNodeSchema: z.ZodType<TreeNode> = z.lazy(() =>
  z.object({
    id: z.string().uuid(),
    value: z.any(),
    children: z.array(treeNodeSchema).optional(),
  })
);

server.tool(
  "process_tree",
  "Process hierarchical tree structure",
  {
    tree: treeNodeSchema,
    maxDepth: z.number().int().positive().max(10).default(5),
  },
  async ({ tree, maxDepth }) => {
    const result = await processTreeStructure(tree, maxDepth);
    return {
      content: [{
        type: "text",
        text: `Processed tree with root ID: ${tree.id}`
      }],
    };
  }
);

模式组合和重用

// 用于重用的基础模式
const addressSchema = z.object({
  street: z.string(),
  city: z.string(),
  state: z.string().length(2),
  zipCode: z.string().regex(/^\d{5}(-\d{4})?$/),
  country: z.string().default("US"),
});

const contactSchema = z.object({
  phone: z.string().regex(/^\+?[\d\s-()]+$/),
  email: z.string().email(),
  preferredContact: z.enum(["phone", "email", "both"]).default("email"),
});

// 使用交集组合模式
const personSchema = z.intersection(
  z.object({
    id: z.string().uuid(),
    firstName: z.string(),
    lastName: z.string(),
    dateOfBirth: z.string().date(),
  }),
  contactSchema
);

// 为特定用例扩展模式
const employeeSchema = personSchema.extend({
  employeeId: z.string(),
  department: z.enum(["engineering", "sales", "hr", "finance"]),
  salary: z.number().positive(),
  address: addressSchema,
  emergencyContact: contactSchema.partial(), // 使所有字段可选
});

const customerSchema = personSchema.extend({
  customerId: z.string(),
  loyaltyPoints: z.number().int().min(0).default(0),
  shippingAddress: addressSchema,
  billingAddress: addressSchema.optional(),
  preferences: z.object({
    newsletter: z.boolean().default(true),
    smsAlerts: z.boolean().default(false),
  }),
});

server.tool(
  "manage_person",
  "Manage employee or customer records",
  {
    type: z.enum(["employee", "customer"]),
    data: z.union([employeeSchema, customerSchema]),
    action: z.enum(["create", "update", "archive"]),
  },
  async ({ type, data, action }) => {
    // TypeScript 根据判别联合缩小类型
    if (type === "employee") {
      return await manageEmployee(data as z.infer<typeof employeeSchema>, action);
    } else {
      return await manageCustomer(data as z.infer<typeof customerSchema>, action);
    }
  }
);

真实世界的 MCP 工具示例

完整的文件处理工具

const FileFormat = z.enum(["csv", "json", "xml", "yaml", "txt"]);
const Operation = z.enum(["parse", "validate", "transform", "merge"]);

server.tool(
  "process_files",
  "Advanced file processing with multiple operations",
  {
    files: z.array(
      z.object({
        name: z.string().regex(/^[\w\-. ]+$/, "Invalid filename"),
        format: FileFormat,
        content: z.string().max(10 * 1024 * 1024, "File size exceeds 10MB"),
        encoding: z.enum(["utf8", "base64", "ascii"]).default("utf8"),
      })
    ).min(1, "At least one file required")
     .max(10, "Maximum 10 files at once"),

    operation: Operation,

    options: z.discriminatedUnion("operation", [
      z.object({
        operation: z.literal("parse"),
        delimiter: z.string().optional(),
        headers: z.boolean().default(true),
        skipRows: z.number().int().min(0).default(0),
      }),
      z.object({
        operation: z.literal("validate"),
        schema: z.record(z.any()), // 用于验证的 JSON 模式
        strict: z.boolean().default(false),
      }),
      z.object({
        operation: z.literal("transform"),
        transformations: z.array(
          z.object({
            field: z.string(),
            operation: z.enum(["uppercase", "lowercase", "trim", "replace", "remove"]),
            value: z.string().optional(),
          })
        ),
      }),
      z.object({
        operation: z.literal("merge"),
        mergeKey: z.string(),
        conflictResolution: z.enum(["keep_first", "keep_last", "combine"]),
      }),
    ]),

    output: z.object({
      format: FileFormat,
      compression: z.enum(["none", "gzip", "zip"]).default("none"),
      splitSize: z.number().int().positive().optional(), // KB
    }),
  },
  async ({ files, operation, options, output }) => {
    // 根据操作类型处理文件
    let result;

    switch (operation) {
      case "parse":
        result = await parseFiles(files, options);
        break;
      case "validate":
        result = await validateFiles(files, options);
        break;
      case "transform":
        result = await transformFiles(files, options);
        break;
      case "merge":
        result = await mergeFiles(files, options);
        break;
    }

    // 格式化输出
    const formatted = await formatOutput(result, output);

    return {
      content: [{
        type: "text",
        text: `Processed ${files.length} files with ${operation} operation. Output: ${output.format}`
      }],
    };
  }
);

带速率限制的 API 集成工具

server.tool(
  "api_request",
  "Make API requests with validation and rate limiting",
  {
    endpoint: z
      .string()
      .url("Invalid URL format")
      .refine(
        (url) => {
          const allowedDomains = ["api.example.com", "api.partner.com"];
          const urlObj = new URL(url);
          return allowedDomains.includes(urlObj.hostname);
        },
        "API domain not whitelisted"
      ),

    method: z.enum(["GET", "POST", "PUT", "PATCH", "DELETE"]),

    headers: z
      .record(z.string())
      .default({})
      .transform((headers) => ({
        ...headers,
        "Content-Type": headers["Content-Type"] || "application/json",
        "User-Agent": "MCP-Server/1.0",
      })),

    body: z
      .any()
      .optional()
      .refine(
        (body, ctx) => {
          const method = ctx.path[0] === "POST" || ctx.path[0] === "PUT" || ctx.path[0] === "PATCH";
          if (method && !body) {
            return false;
          }
          return true;
        },
        "Body required for POST/PUT/PATCH requests"
      ),

    auth: z.discriminatedUnion("type", [
      z.object({
        type: z.literal("bearer"),
        token: z.string().min(1),
      }),
      z.object({
        type: z.literal("api_key"),
        key: z.string().min(1),
        location: z.enum(["header", "query"]),
      }),
      z.object({
        type: z.literal("basic"),
        username: z.string(),
        password: z.string(),
      }),
      z.object({
        type: z.literal("oauth2"),
        clientId: z.string(),
        clientSecret: z.string(),
        scope: z.array(z.string()).optional(),
      }),
    ]).optional(),

    retry: z.object({
      maxAttempts: z.number().int().min(1).max(5).default(3),
      backoffMs: z.number().int().min(100).max(60000).default(1000),
      retryOn: z.array(z.number().int()).default([429, 500, 502, 503, 504]),
    }).optional(),

    timeout: z.number().int().min(1000).max(300000).default(30000), // ms
  },
  async (params) => {
    // 应用速率限制
    await rateLimiter.checkLimit(params.endpoint);

    // 构建带认证的请求
    const request = buildAuthenticatedRequest(params);

    // 使用重试逻辑执行
    const response = await executeWithRetry(request, params.retry);

    return {
      content: [{
        type: "text",
        text: `API request to ${params.endpoint} completed with status ${response.status}`
      }],
    };
  }
);

MCP Zod 模式的最佳实践

1. 使用描述性错误消息

// ❌ 差:通用消息
z.string().min(3)

// ✅ 好:具体、可操作的消息
z.string().min(3, "Username must be at least 3 characters long")

2. 在合理的地方提供默认值

// ✅ 好:为可选配置提供默认值
z.object({
  retryCount: z.number().int().default(3),
  timeout: z.number().default(30000),
  debug: z.boolean().default(false),
})

3. 使用 Transform 进行数据规范化

// ✅ 好:尽早规范化数据
z.object({
  email: z.string().email().transform(e => e.toLowerCase()),
  tags: z.string().transform(s => s.split(",").map(t => t.trim())),
})

4. 利用判别联合处理复杂类型

// ✅ 好:类型安全的分支
z.discriminatedUnion("type", [
  z.object({ type: z.literal("A"), fieldA: z.string() }),
  z.object({ type: z.literal("B"), fieldB: z.number() }),
])

5. 使用细化处理业务逻辑

// ✅ 好:封装业务规则
z.object({
  startDate: z.date(),
  endDate: z.date(),
}).refine(
  data => data.startDate < data.endDate,
  "End date must be after start date"
)

6. 创建可重用的模式组件

// ✅ 好:DRY 原则
const paginationSchema = z.object({
  page: z.number().int().positive().default(1),
  limit: z.number().int().min(1).max(100).default(20),
});

// 在多个工具中重用
server.tool("list_users", "...", {
  ...paginationSchema.shape,
  filter: z.string().optional(),
});

7. 适当处理可选与可空

// 理解区别:
z.string().optional()  // string | undefined
z.string().nullable()  // string | null
z.string().nullish()   // string | null | undefined

结论

Zod 为 MCP 服务器验证提供了强大的、类型安全的基础。通过利用其全面的 API——从基本原语到复杂的判别联合、转换和细化——您可以构建强大的 MCP 工具来处理 AI 模型可能发送的任何数据结构。

关键是将您的 Zod 模式视为契约:它们准确定义了您的工具期望的内容,在不满足期望时提供有用的错误消息,并将数据转换为业务逻辑所需的确切形状。通过本指南中的模式和示例,您具备了处理 MCP 服务器中任何验证场景的能力。