在 MCP 服务器中掌握 Zod 验证与 vercel/mcp-handler
mcpzodtypescriptvalidationai
By sko X opus 4.1•9/19/2025•23 min read
Zod 是 MCP 服务器中类型安全验证的核心。在使用 vercel/mcp-handler
构建时,Zod 模式定义了 AI 模型与您的工具之间的契约,确保数据完整性并提供丰富的 TypeScript 类型推断。本指南涵盖了您需要的每个 Zod 功能,并提供了实用的 MCP 特定示例。
MCP 服务器的核心 Zod 概念
将 MCP 中的 Zod 模式视为 AI 的 API 契约。您公开的每个工具都需要清晰的输入验证——就像 REST 端点需要请求验证一样,但增加了 AI 模型动态生成输入的复杂性。
// 传统 API:人类开发者阅读文档并发送正确数据
// MCP 工具:AI 模型读取模式并生成匹配数据
// Zod 确保契约的两端都得到执行
基本类型验证模式
带业务规则的字符串验证
import { createMcpHandler } from "mcp-handler";
import { z } from "zod";
const handler = createMcpHandler((server) => {
server.tool(
"user_registration",
"Register a new user with validated inputs",
{
// 带自定义域名验证的邮箱
email: z
.string()
.email("Invalid email format")
.refine(
(email) => email.endsWith("@company.com") || email.endsWith("@partner.org"),
"Email must be from company.com or partner.org domain"
),
// 带复杂要求的用户名
username: z
.string()
.min(3, "Username must be at least 3 characters")
.max(20, "Username cannot exceed 20 characters")
.regex(/^[a-zA-Z0-9_-]+$/, "Username can only contain letters, numbers, underscore, and hyphen")
.transform((val) => val.toLowerCase()), // Always lowercase
// 带安全要求的密码
password: z
.string()
.min(8, "Password must be at least 8 characters")
.regex(/[A-Z]/, "Password must contain at least one uppercase letter")
.regex(/[a-z]/, "Password must contain at least one lowercase letter")
.regex(/[0-9]/, "Password must contain at least one number")
.regex(/[^A-Za-z0-9]/, "Password must contain at least one special character"),
// 带长度限制的可选简介
bio: z
.string()
.max(500, "Bio cannot exceed 500 characters")
.optional()
.transform((val) => val?.trim()), // Trim whitespace if provided
},
async ({ email, username, password, bio }) => {
// 所有输入都已验证和类型化
const user = await createUser({ email, username, password, bio });
return {
content: [{
type: "text",
text: `User ${username} created successfully with ID: ${user.id}`
}],
};
}
);
});
带范围的数字验证
server.tool(
"financial_transaction",
"Process financial transactions with strict validation",
{
amount: z
.number()
.positive("Amount must be positive")
.multipleOf(0.01, "Amount must be to 2 decimal places")
.max(1000000, "Single transaction limit exceeded"),
quantity: z
.number()
.int("Quantity must be a whole number")
.min(1, "Quantity must be at least 1")
.max(100, "Maximum quantity is 100"),
percentage: z
.number()
.min(0, "Percentage cannot be negative")
.max(100, "Percentage cannot exceed 100")
.transform((val) => val / 100), // Convert to decimal
temperature: z
.number()
.finite("Temperature must be a finite number")
.refine(
(val) => val >= -273.15,
"Temperature cannot be below absolute zero"
),
},
async (params) => {
// 处理已验证的交易
return { content: [{ type: "text", text: "Transaction processed" }] };
}
);
布尔值和日期验证
server.tool(
"schedule_meeting",
"Schedule a meeting with date/time validation",
{
title: z.string().min(1, "Title is required"),
// 带业务逻辑的日期验证
startDate: z
.string()
.datetime({ message: "Invalid datetime format" })
.refine((date) => {
const d = new Date(date);
return d > new Date();
}, "Start date must be in the future")
.transform((str) => new Date(str)),
// 分钟为单位的持续时间
duration: z
.number()
.int()
.min(15, "Minimum meeting duration is 15 minutes")
.max(480, "Maximum meeting duration is 8 hours"),
// 带默认值的布尔标志
isRecurring: z.boolean().default(false),
sendReminders: z.boolean().default(true),
requiresApproval: z.boolean().optional(),
// 将字符串强制转换为布尔值(适用于表单数据)
isPublic: z
.string()
.transform((val) => val === "true" || val === "1")
.or(z.boolean())
.default(false),
},
async (meeting) => {
// meeting.startDate 现在是 Date 对象
const scheduled = await scheduleMeeting(meeting);
return {
content: [{
type: "text",
text: `Meeting scheduled for ${meeting.startDate.toISOString()}`
}],
};
}
);
高级对象模式
带验证的嵌套对象
server.tool(
"create_project",
"Create a project with complex nested structure",
{
project: z.object({
name: z.string().min(1).max(100),
description: z.string().optional(),
// 设置的嵌套对象
settings: z.object({
visibility: z.enum(["public", "private", "team"]),
autoArchive: z.boolean().default(false),
maxMembers: z.number().int().min(1).max(1000).default(50),
}),
// 带验证依赖的嵌套对象
budget: z.object({
amount: z.number().positive(),
currency: z.enum(["USD", "EUR", "GBP"]),
allocated: z.number().default(0),
}).refine(
(budget) => budget.allocated <= budget.amount,
"Allocated budget cannot exceed total amount"
),
// 嵌套对象数组
milestones: z.array(
z.object({
title: z.string(),
dueDate: z.string().datetime(),
status: z.enum(["pending", "in_progress", "completed"]).default("pending"),
})
).min(1, "At least one milestone is required")
.max(20, "Maximum 20 milestones allowed"),
}),
},
async ({ project }) => {
const created = await createProject(project);
return {
content: [{
type: "text",
text: `Project "${project.name}" created with ${project.milestones.length} milestones`
}],
};
}
);
带记录的动态对象键
server.tool(
"update_metadata",
"Update metadata with dynamic key-value pairs",
{
entityId: z.string().uuid("Invalid entity ID format"),
// 带字符串键和各种值类型的记录
metadata: z.record(
z.string().regex(/^[a-z_][a-z0-9_]*$/i, "Invalid metadata key format"),
z.union([
z.string(),
z.number(),
z.boolean(),
z.null(),
])
),
// 带枚举键的记录
permissions: z.record(
z.enum(["read", "write", "delete", "admin"]),
z.boolean()
),
// 复杂结构的嵌套记录
translations: z.record(
z.string().length(2, "Language code must be 2 characters"), // e.g., "en", "fr"
z.object({
title: z.string(),
description: z.string().optional(),
})
),
},
async ({ entityId, metadata, permissions, translations }) => {
// 对动态键的类型安全访问
const result = await updateEntity(entityId, { metadata, permissions, translations });
return {
content: [{
type: "text",
text: `Updated entity ${entityId} with ${Object.keys(metadata).length} metadata fields`
}],
};
}
);
数组和元组
带约束的数组验证
server.tool(
"batch_process",
"Process batch operations with array validation",
{
// 带长度约束的简单数组
ids: z
.array(z.string().uuid())
.min(1, "At least one ID required")
.max(100, "Maximum 100 items per batch"),
// 带唯一值的数组
tags: z
.array(z.string())
.refine(
(tags) => new Set(tags).size === tags.length,
"Tags must be unique"
),
// 带复杂验证的对象数组
operations: z
.array(
z.object({
type: z.enum(["create", "update", "delete"]),
data: z.record(z.any()),
priority: z.number().int().min(1).max(10).default(5),
})
)
.transform((ops) =>
// 按优先级排序
ops.sort((a, b) => b.priority - a.priority)
),
// 固定长度数组的元组
coordinates: z.tuple([
z.number().min(-90).max(90), // latitude
z.number().min(-180).max(180), // longitude
]),
// 元组中的其余元素
version: z.tuple([
z.number().int(), // major
z.number().int(), // minor
z.number().int(), // patch
]).rest(z.string()), // 可以有额外的字符串元素
},
async ({ ids, tags, operations, coordinates, version }) => {
// coordinates 类型为 [number, number]
// version 类型为 [number, number, number, ...string[]]
const results = await processBatch({ ids, tags, operations });
return {
content: [{
type: "text",
text: `Processed ${operations.length} operations at coordinates ${coordinates.join(", ")}`
}],
};
}
);
枚举和判别联合
枚举模式
// 定义枚举值为常量以便重用
const TaskStatus = {
PENDING: "pending",
IN_PROGRESS: "in_progress",
COMPLETED: "completed",
CANCELLED: "cancelled",
} as const;
const TaskPriority = ["low", "medium", "high", "urgent"] as const;
server.tool(
"task_management",
"Manage tasks with enum validation",
{
action: z.enum(["create", "update", "delete", "archive"]),
// 使用对象值作为枚举
status: z.enum(Object.values(TaskStatus) as [string, ...string[]]),
// 使用数组作为枚举
priority: z.enum(TaskPriority),
// 带默认值的可选枚举
assigneeRole: z
.enum(["developer", "designer", "manager", "qa"])
.optional()
.default("developer"),
// 转换枚举值
visibility: z
.enum(["0", "1", "2"]) // 传入为字符串
.transform((val) => {
const map = { "0": "private", "1": "team", "2": "public" };
return map[val as keyof typeof map];
}),
},
async (params) => {
const result = await manageTask(params);
return {
content: [{
type: "text",
text: `Task ${params.action}d with status: ${params.status}`
}],
};
}
);
复杂类型的判别联合
server.tool(
"process_payment",
"Process different payment types with discriminated unions",
{
payment: z.discriminatedUnion("type", [
z.object({
type: z.literal("credit_card"),
cardNumber: z.string().regex(/^\d{16}$/, "Card number must be 16 digits"),
cvv: z.string().regex(/^\d{3,4}$/, "CVV must be 3-4 digits"),
expiryMonth: z.number().int().min(1).max(12),
expiryYear: z.number().int().min(new Date().getFullYear()),
}),
z.object({
type: z.literal("bank_transfer"),
accountNumber: z.string(),
routingNumber: z.string(),
accountType: z.enum(["checking", "savings"]),
}),
z.object({
type: z.literal("crypto"),
walletAddress: z.string().regex(/^0x[a-fA-F0-9]{40}$/, "Invalid wallet address"),
network: z.enum(["ethereum", "polygon", "arbitrum"]),
tokenAddress: z.string().optional(),
}),
z.object({
type: z.literal("paypal"),
email: z.string().email(),
paypalId: z.string().optional(),
}),
]),
amount: z.number().positive(),
currency: z.string().length(3, "Currency code must be 3 characters"),
},
async ({ payment, amount, currency }) => {
// TypeScript 根据 payment.type 知道确切的形状
if (payment.type === "credit_card") {
// payment.cardNumber 在这里可用
return await processCreditCard(payment, amount, currency);
} else if (payment.type === "crypto") {
// payment.walletAddress 在这里可用
return await processCrypto(payment, amount, currency);
}
// ... 处理其他支付类型
return {
content: [{
type: "text",
text: `Payment of ${amount} ${currency} processed via ${payment.type}`
}],
};
}
);
转换和预处理
数据转换管道
server.tool(
"import_data",
"Import and transform data with preprocessing",
{
// 将字符串强制转换为数字
userId: z.string().transform((val) => parseInt(val, 10)),
// 解析 JSON 字符串
config: z
.string()
.transform((str) => {
try {
return JSON.parse(str);
} catch {
throw new Error("Invalid JSON configuration");
}
})
.pipe(
z.object({
enabled: z.boolean(),
threshold: z.number(),
})
),
// 清理和规范化数据
phone: z
.string()
.transform((phone) => phone.replace(/\D/g, "")) // 移除非数字
.refine((phone) => phone.length === 10, "Phone must be 10 digits")
.transform((phone) => `+1${phone}`), // 添加国家代码
// 解析逗号分隔值
tags: z
.string()
.transform((str) => str.split(",").map(s => s.trim()))
.pipe(z.array(z.string().min(1))),
// 带验证的复杂转换
dateRange: z
.string()
.transform((str) => {
const [start, end] = str.split(" to ").map(d => new Date(d.trim()));
return { start, end };
})
.refine(
({ start, end }) => start < end,
"Start date must be before end date"
),
},
async (data) => {
// 所有数据都已转换和验证
const result = await importData(data);
return {
content: [{
type: "text",
text: `Imported data for user ${data.userId} with ${data.tags.length} tags`
}],
};
}
);
用于规范化的预处理
server.tool(
"search_products",
"Search products with input preprocessing",
{
// 预处理以处理各种输入格式
query: z.preprocess(
(val) => {
if (typeof val === "string") return val.trim().toLowerCase();
if (typeof val === "number") return String(val);
return val;
},
z.string().min(1, "Search query required")
),
// 强制转换和验证价格范围
minPrice: z.preprocess(
(val) => (val === "" || val === null ? undefined : Number(val)),
z.number().min(0).optional()
),
maxPrice: z.preprocess(
(val) => (val === "" || val === null ? undefined : Number(val)),
z.number().positive().optional()
),
// 处理类布尔值
inStock: z.preprocess(
(val) => {
if (typeof val === "boolean") return val;
if (val === "true" || val === "1" || val === 1) return true;
if (val === "false" || val === "0" || val === 0) return false;
return undefined;
},
z.boolean().optional()
),
// 解析和验证日期字符串
availableAfter: z.preprocess(
(val) => {
if (!val) return undefined;
const date = new Date(String(val));
return isNaN(date.getTime()) ? undefined : date;
},
z.date().optional()
),
},
async (params) => {
const products = await searchProducts(params);
return {
content: [{
type: "text",
text: `Found ${products.length} products matching "${params.query}"`
}],
};
}
);
细化和自定义验证
基本细化
server.tool(
"create_event",
"Create event with complex validation rules",
{
name: z.string(),
startTime: z.string().datetime(),
endTime: z.string().datetime(),
maxAttendees: z.number().int().positive(),
currentAttendees: z.number().int().min(0).default(0),
// 对象级细化
location: z.object({
type: z.enum(["physical", "virtual", "hybrid"]),
address: z.string().optional(),
url: z.string().url().optional(),
capacity: z.number().int().positive().optional(),
}).refine(
(loc) => {
if (loc.type === "physical" && !loc.address) {
return false;
}
if (loc.type === "virtual" && !loc.url) {
return false;
}
return true;
},
{
message: "Physical events need address, virtual events need URL",
}
),
},
async (eventData) => {
const event = await createEvent(eventData);
return {
content: [{
type: "text",
text: `Event "${eventData.name}" created successfully`
}],
};
}
);
用于多重验证的 SuperRefine
server.tool(
"configure_deployment",
"Configure deployment with interdependent validations",
{
environment: z.enum(["development", "staging", "production"]),
replicas: z.number().int().positive(),
memory: z.number().positive(), // in GB
cpu: z.number().positive(), // in cores
autoScaling: z.boolean(),
minReplicas: z.number().int().positive().optional(),
maxReplicas: z.number().int().positive().optional(),
customDomain: z.string().optional(),
ssl: z.boolean().optional(),
},
async (config) => {
// 输入针对所有 superRefine 规则进行验证
const deployment = await configureDeployment(config);
return {
content: [{
type: "text",
text: `Deployment configured for ${config.environment} with ${config.replicas} replicas`
}],
};
}
);
// 为复杂的多字段验证向模式添加 superRefine
const deploymentSchema = z
.object({
environment: z.enum(["development", "staging", "production"]),
replicas: z.number().int().positive(),
memory: z.number().positive(),
cpu: z.number().positive(),
autoScaling: z.boolean(),
minReplicas: z.number().int().positive().optional(),
maxReplicas: z.number().int().positive().optional(),
customDomain: z.string().optional(),
ssl: z.boolean().optional(),
})
.superRefine((data, ctx) => {
// 生产特定验证
if (data.environment === "production") {
if (data.replicas < 2) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "Production must have at least 2 replicas",
path: ["replicas"],
});
}
if (data.memory < 4) {
ctx.addIssue({
code: z.ZodIssueCode.too_small,
message: "Production requires at least 4GB memory",
path: ["memory"],
minimum: 4,
inclusive: true,
type: "number",
});
}
}
// 自动扩展验证
if (data.autoScaling) {
if (!data.minReplicas || !data.maxReplicas) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "Auto-scaling requires both minReplicas and maxReplicas",
});
}
if (data.minReplicas && data.maxReplicas && data.minReplicas > data.maxReplicas) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "minReplicas cannot be greater than maxReplicas",
path: ["minReplicas"],
});
}
}
// 自定义域名需要 SSL
if (data.customDomain && !data.ssl) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "Custom domain requires SSL to be enabled",
path: ["ssl"],
});
}
// 资源比例验证
const cpuMemoryRatio = data.cpu / data.memory;
if (cpuMemoryRatio > 1) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "CPU cores should not exceed memory (GB)",
});
}
});
异步细化
server.tool(
"register_domain",
"Register domain with async validation",
{
domain: z
.string()
.regex(/^[a-z0-9-]+\.[a-z]{2,}$/, "Invalid domain format")
.refine(async (domain) => {
// 检查域名是否可用(异步操作)
const available = await checkDomainAvailability(domain);
return available;
}, "Domain is not available"),
owner: z.object({
email: z
.string()
.email()
.refine(async (email) => {
// 验证邮箱不在黑名单中
const blacklisted = await checkEmailBlacklist(email);
return !blacklisted;
}, "Email is blacklisted"),
organizationId: z
.string()
.uuid()
.refine(async (id) => {
// 验证组织是否存在
const exists = await organizationExists(id);
return exists;
}, "Organization not found"),
}),
duration: z.number().int().min(1).max(10), // years
},
async ({ domain, owner, duration }) => {
const registration = await registerDomain({ domain, owner, duration });
return {
content: [{
type: "text",
text: `Domain ${domain} registered for ${duration} years`
}],
};
}
);
联合类型和可选处理
灵活的输入类型
server.tool(
"flexible_query",
"Handle multiple input formats with unions",
{
// 字符串或数字 ID
identifier: z.union([
z.string().uuid(),
z.number().int().positive(),
z.string().regex(/^[A-Z]{2,4}-\d{6}$/), // 自定义格式
]),
// 多种日期格式
date: z.union([
z.string().datetime(),
z.string().regex(/^\d{4}-\d{2}-\d{2}$/), // YYYY-MM-DD
z.number(), // Unix timestamp
]).transform((val) => {
if (typeof val === "number") return new Date(val * 1000);
return new Date(val);
}),
// 多类型可选
filter: z
.union([
z.string(),
z.array(z.string()),
z.object({
include: z.array(z.string()).optional(),
exclude: z.array(z.string()).optional(),
}),
])
.optional()
.transform((filter) => {
// 规范化为一致格式
if (!filter) return { include: [], exclude: [] };
if (typeof filter === "string") return { include: [filter], exclude: [] };
if (Array.isArray(filter)) return { include: filter, exclude: [] };
return filter;
}),
// 可空与可选
description: z.string().nullable(), // 可以为 null
metadata: z.record(z.any()).optional(), // 可以为 undefined
notes: z.string().nullish(), // 可以为 null 或 undefined
},
async (params) => {
const result = await executeQuery(params);
return {
content: [{
type: "text",
text: `Query executed with identifier: ${params.identifier}`
}],
};
}
);
错误处理和自定义消息
全面的错误消息
const userInputSchema = z.object({
username: z
.string({
required_error: "Username is required",
invalid_type_error: "Username must be a string",
})
.min(3, { message: "Username must be at least 3 characters long" })
.max(20, { message: "Username cannot exceed 20 characters" })
.regex(/^[a-zA-Z0-9_]+$/, {
message: "Username can only contain letters, numbers, and underscores"
}),
age: z
.number({
required_error: "Age is required",
invalid_type_error: "Age must be a number",
})
.int({ message: "Age must be a whole number" })
.positive({ message: "Age must be positive" })
.max(120, { message: "Age cannot exceed 120" }),
email: z
.string()
.email({ message: "Please provide a valid email address" })
.refine(
(email) => !email.includes("tempmail"),
{ message: "Temporary email addresses are not allowed" }
),
});
server.tool(
"validate_user",
"Validate user input with detailed error messages",
userInputSchema.shape,
async (userData) => {
// 如果到达这里,所有验证都通过了
return {
content: [{
type: "text",
text: `User ${userData.username} validated successfully`
}],
};
}
);
错误路径规范
const complexSchema = z
.object({
user: z.object({
profile: z.object({
firstName: z.string(),
lastName: z.string(),
age: z.number(),
}),
settings: z.object({
notifications: z.boolean(),
theme: z.enum(["light", "dark"]),
}),
}),
permissions: z.array(z.string()),
})
.superRefine((data, ctx) => {
if (data.user.profile.age < 18 && data.permissions.includes("admin")) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "Users under 18 cannot have admin permissions",
path: ["permissions"], // 为错误指定确切路径
});
}
if (data.user.profile.firstName === data.user.profile.lastName) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "First name and last name must be different",
path: ["user", "profile", "lastName"], // 嵌套路径
});
}
});
性能优化模式
递归结构的懒惰模式
// 定义递归类型(例如树结构、嵌套评论)
type TreeNode = {
id: string;
value: any;
children?: TreeNode[];
};
const treeNodeSchema: z.ZodType<TreeNode> = z.lazy(() =>
z.object({
id: z.string().uuid(),
value: z.any(),
children: z.array(treeNodeSchema).optional(),
})
);
server.tool(
"process_tree",
"Process hierarchical tree structure",
{
tree: treeNodeSchema,
maxDepth: z.number().int().positive().max(10).default(5),
},
async ({ tree, maxDepth }) => {
const result = await processTreeStructure(tree, maxDepth);
return {
content: [{
type: "text",
text: `Processed tree with root ID: ${tree.id}`
}],
};
}
);
模式组合和重用
// 用于重用的基础模式
const addressSchema = z.object({
street: z.string(),
city: z.string(),
state: z.string().length(2),
zipCode: z.string().regex(/^\d{5}(-\d{4})?$/),
country: z.string().default("US"),
});
const contactSchema = z.object({
phone: z.string().regex(/^\+?[\d\s-()]+$/),
email: z.string().email(),
preferredContact: z.enum(["phone", "email", "both"]).default("email"),
});
// 使用交集组合模式
const personSchema = z.intersection(
z.object({
id: z.string().uuid(),
firstName: z.string(),
lastName: z.string(),
dateOfBirth: z.string().date(),
}),
contactSchema
);
// 为特定用例扩展模式
const employeeSchema = personSchema.extend({
employeeId: z.string(),
department: z.enum(["engineering", "sales", "hr", "finance"]),
salary: z.number().positive(),
address: addressSchema,
emergencyContact: contactSchema.partial(), // 使所有字段可选
});
const customerSchema = personSchema.extend({
customerId: z.string(),
loyaltyPoints: z.number().int().min(0).default(0),
shippingAddress: addressSchema,
billingAddress: addressSchema.optional(),
preferences: z.object({
newsletter: z.boolean().default(true),
smsAlerts: z.boolean().default(false),
}),
});
server.tool(
"manage_person",
"Manage employee or customer records",
{
type: z.enum(["employee", "customer"]),
data: z.union([employeeSchema, customerSchema]),
action: z.enum(["create", "update", "archive"]),
},
async ({ type, data, action }) => {
// TypeScript 根据判别联合缩小类型
if (type === "employee") {
return await manageEmployee(data as z.infer<typeof employeeSchema>, action);
} else {
return await manageCustomer(data as z.infer<typeof customerSchema>, action);
}
}
);
真实世界的 MCP 工具示例
完整的文件处理工具
const FileFormat = z.enum(["csv", "json", "xml", "yaml", "txt"]);
const Operation = z.enum(["parse", "validate", "transform", "merge"]);
server.tool(
"process_files",
"Advanced file processing with multiple operations",
{
files: z.array(
z.object({
name: z.string().regex(/^[\w\-. ]+$/, "Invalid filename"),
format: FileFormat,
content: z.string().max(10 * 1024 * 1024, "File size exceeds 10MB"),
encoding: z.enum(["utf8", "base64", "ascii"]).default("utf8"),
})
).min(1, "At least one file required")
.max(10, "Maximum 10 files at once"),
operation: Operation,
options: z.discriminatedUnion("operation", [
z.object({
operation: z.literal("parse"),
delimiter: z.string().optional(),
headers: z.boolean().default(true),
skipRows: z.number().int().min(0).default(0),
}),
z.object({
operation: z.literal("validate"),
schema: z.record(z.any()), // 用于验证的 JSON 模式
strict: z.boolean().default(false),
}),
z.object({
operation: z.literal("transform"),
transformations: z.array(
z.object({
field: z.string(),
operation: z.enum(["uppercase", "lowercase", "trim", "replace", "remove"]),
value: z.string().optional(),
})
),
}),
z.object({
operation: z.literal("merge"),
mergeKey: z.string(),
conflictResolution: z.enum(["keep_first", "keep_last", "combine"]),
}),
]),
output: z.object({
format: FileFormat,
compression: z.enum(["none", "gzip", "zip"]).default("none"),
splitSize: z.number().int().positive().optional(), // KB
}),
},
async ({ files, operation, options, output }) => {
// 根据操作类型处理文件
let result;
switch (operation) {
case "parse":
result = await parseFiles(files, options);
break;
case "validate":
result = await validateFiles(files, options);
break;
case "transform":
result = await transformFiles(files, options);
break;
case "merge":
result = await mergeFiles(files, options);
break;
}
// 格式化输出
const formatted = await formatOutput(result, output);
return {
content: [{
type: "text",
text: `Processed ${files.length} files with ${operation} operation. Output: ${output.format}`
}],
};
}
);
带速率限制的 API 集成工具
server.tool(
"api_request",
"Make API requests with validation and rate limiting",
{
endpoint: z
.string()
.url("Invalid URL format")
.refine(
(url) => {
const allowedDomains = ["api.example.com", "api.partner.com"];
const urlObj = new URL(url);
return allowedDomains.includes(urlObj.hostname);
},
"API domain not whitelisted"
),
method: z.enum(["GET", "POST", "PUT", "PATCH", "DELETE"]),
headers: z
.record(z.string())
.default({})
.transform((headers) => ({
...headers,
"Content-Type": headers["Content-Type"] || "application/json",
"User-Agent": "MCP-Server/1.0",
})),
body: z
.any()
.optional()
.refine(
(body, ctx) => {
const method = ctx.path[0] === "POST" || ctx.path[0] === "PUT" || ctx.path[0] === "PATCH";
if (method && !body) {
return false;
}
return true;
},
"Body required for POST/PUT/PATCH requests"
),
auth: z.discriminatedUnion("type", [
z.object({
type: z.literal("bearer"),
token: z.string().min(1),
}),
z.object({
type: z.literal("api_key"),
key: z.string().min(1),
location: z.enum(["header", "query"]),
}),
z.object({
type: z.literal("basic"),
username: z.string(),
password: z.string(),
}),
z.object({
type: z.literal("oauth2"),
clientId: z.string(),
clientSecret: z.string(),
scope: z.array(z.string()).optional(),
}),
]).optional(),
retry: z.object({
maxAttempts: z.number().int().min(1).max(5).default(3),
backoffMs: z.number().int().min(100).max(60000).default(1000),
retryOn: z.array(z.number().int()).default([429, 500, 502, 503, 504]),
}).optional(),
timeout: z.number().int().min(1000).max(300000).default(30000), // ms
},
async (params) => {
// 应用速率限制
await rateLimiter.checkLimit(params.endpoint);
// 构建带认证的请求
const request = buildAuthenticatedRequest(params);
// 使用重试逻辑执行
const response = await executeWithRetry(request, params.retry);
return {
content: [{
type: "text",
text: `API request to ${params.endpoint} completed with status ${response.status}`
}],
};
}
);
MCP Zod 模式的最佳实践
1. 使用描述性错误消息
// ❌ 差:通用消息
z.string().min(3)
// ✅ 好:具体、可操作的消息
z.string().min(3, "Username must be at least 3 characters long")
2. 在合理的地方提供默认值
// ✅ 好:为可选配置提供默认值
z.object({
retryCount: z.number().int().default(3),
timeout: z.number().default(30000),
debug: z.boolean().default(false),
})
3. 使用 Transform 进行数据规范化
// ✅ 好:尽早规范化数据
z.object({
email: z.string().email().transform(e => e.toLowerCase()),
tags: z.string().transform(s => s.split(",").map(t => t.trim())),
})
4. 利用判别联合处理复杂类型
// ✅ 好:类型安全的分支
z.discriminatedUnion("type", [
z.object({ type: z.literal("A"), fieldA: z.string() }),
z.object({ type: z.literal("B"), fieldB: z.number() }),
])
5. 使用细化处理业务逻辑
// ✅ 好:封装业务规则
z.object({
startDate: z.date(),
endDate: z.date(),
}).refine(
data => data.startDate < data.endDate,
"End date must be after start date"
)
6. 创建可重用的模式组件
// ✅ 好:DRY 原则
const paginationSchema = z.object({
page: z.number().int().positive().default(1),
limit: z.number().int().min(1).max(100).default(20),
});
// 在多个工具中重用
server.tool("list_users", "...", {
...paginationSchema.shape,
filter: z.string().optional(),
});
7. 适当处理可选与可空
// 理解区别:
z.string().optional() // string | undefined
z.string().nullable() // string | null
z.string().nullish() // string | null | undefined
结论
Zod 为 MCP 服务器验证提供了强大的、类型安全的基础。通过利用其全面的 API——从基本原语到复杂的判别联合、转换和细化——您可以构建强大的 MCP 工具来处理 AI 模型可能发送的任何数据结构。
关键是将您的 Zod 模式视为契约:它们准确定义了您的工具期望的内容,在不满足期望时提供有用的错误消息,并将数据转换为业务逻辑所需的确切形状。通过本指南中的模式和示例,您具备了处理 MCP 服务器中任何验证场景的能力。