草案 "智能体设计模式 - 安全防护与保障"
在 TypeScript/Vercel 生产环境中为 LLM 智能体实施强健的安全防护措施,防止有害输出、提示注入攻击,并确保可靠运行。
思维模型:AI 系统的深度防御
将智能体安全想象成构建一个安全的银行应用程序。你需要多层安全防护:输入验证(类似于 SQL 输入清理)、运行时监控(类似于欺诈检测)、输出过滤(类似于 PII 掩码)和熔断器(类似于速率限制)。每一层都能捕获不同的威胁 - 提示注入就是你的 SQL 注入,幻觉就是你的数据损坏错误,恶意输出就是你的安全漏洞。正如你不会仅仅依赖单一防火墙,有效的 AI 安全需要多层防御系统协同工作。
基础示例:输入验证和输出过滤
1. 简单基于关键词的安全防护
// lib/guards/basic-safety.ts
import { isEmpty, includes, some, toLower } from 'es-toolkit';
interface SafetyCheckResult {
safe: boolean;
violations: string[];
confidence: number;
}
export class BasicSafetyGuard {
private blockedTerms = [
'ignore instructions',
'disregard previous',
'system prompt',
'reveal instructions'
];
private suspiciousPatterns = [
/\bAPI[_\s]KEY\b/i,
/password\s*[:=]/i,
/DROP\s+TABLE/i,
/\<script\>/i
];
checkInput(input: string): SafetyCheckResult {
if (isEmpty(input)) {
return { safe: true, violations: [], confidence: 1.0 };
}
const lowerInput = toLower(input);
const violations: string[] = [];
// Check blocked terms
const foundBlockedTerms = this.blockedTerms.filter(term =>
includes(lowerInput, term)
);
violations.push(...foundBlockedTerms.map(t => `Blocked term: ${t}`));
// Check suspicious patterns
const matchedPatterns = this.suspiciousPatterns.filter(pattern =>
pattern.test(input)
);
violations.push(...matchedPatterns.map(p => `Suspicious pattern: ${p.source}`));
return {
safe: violations.length === 0,
violations,
confidence: violations.length === 0 ? 1.0 : 0.2
};
}
}
使用关键词匹配和正则表达式模式来捕获常见提示注入攻击的基础安全防护,延迟最小。
2. 与 API 路由集成
// app/api/chat/route.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { BasicSafetyGuard } from '@/lib/guards/basic-safety';
import { NextResponse } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 60;
const guard = new BasicSafetyGuard();
export async function POST(req: Request) {
try {
const { message } = await req.json();
// Input validation
const inputCheck = guard.checkInput(message);
if (!inputCheck.safe) {
return NextResponse.json(
{
error: 'Input contains prohibited content',
violations: inputCheck.violations
},
{ status: 400 }
);
}
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3,
maxOutputTokens: 2048,
});
const response = await model.invoke(message);
// Output validation
const outputCheck = guard.checkInput(response.content as string);
if (!outputCheck.safe) {
console.error('Output safety violation:', outputCheck.violations);
return NextResponse.json(
{ content: 'I cannot provide that information.' },
{ status: 200 }
);
}
return NextResponse.json({ content: response.content });
} catch (error) {
console.error('Chat error:', error);
return NextResponse.json(
{ error: 'Processing failed' },
{ status: 500 }
);
}
}
具有双层保护的 API 路由,在返回响应给用户之前检查输入和输出。
3. 基于令牌跟踪的速率限制
// lib/guards/rate-limiter.ts
import { groupBy, sumBy, filter } from 'es-toolkit';
import { differenceInMinutes } from 'es-toolkit/compat';
interface TokenUsage {
userId: string;
tokens: number;
timestamp: Date;
}
export class TokenRateLimiter {
private usage: Map<string, TokenUsage[]> = new Map();
private readonly maxTokensPerMinute = 10000;
private readonly maxTokensPerHour = 50000;
async checkLimit(userId: string, estimatedTokens: number): Promise<boolean> {
const now = new Date();
const userUsage = this.usage.get(userId) || [];
// Clean old entries
const relevantUsage = filter(userUsage, entry =>
differenceInMinutes(now, entry.timestamp) < 60
);
// Calculate usage in different windows
const lastMinute = filter(relevantUsage, entry =>
differenceInMinutes(now, entry.timestamp) < 1
);
const lastHour = relevantUsage;
const minuteTokens = sumBy(lastMinute, 'tokens');
const hourTokens = sumBy(lastHour, 'tokens');
if (minuteTokens + estimatedTokens > this.maxTokensPerMinute) {
throw new Error(`Rate limit exceeded: ${minuteTokens}/${this.maxTokensPerMinute} tokens/min`);
}
if (hourTokens + estimatedTokens > this.maxTokensPerHour) {
throw new Error(`Hourly limit exceeded: ${hourTokens}/${this.maxTokensPerHour} tokens/hour`);
}
// Record usage
relevantUsage.push({ userId, tokens: estimatedTokens, timestamp: now });
this.usage.set(userId, relevantUsage);
return true;
}
}
基于令牌的速率限制器,跟踪多个时间窗口的使用情况,防止滥用的同时允许突发流量。
4. 带有安全反馈的前端集成
// components/SafeChatInterface.tsx
'use client';
import { useState } from 'react';
import { useMutation } from '@tanstack/react-query';
export default function SafeChatInterface() {
const [input, setInput] = useState('');
const [messages, setMessages] = useState<Array<{role: string, content: string}>>([]);
const sendMessage = useMutation({
mutationFn: async (message: string) => {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message })
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.violations?.join(', ') || 'Failed to send message');
}
return response.json();
},
onSuccess: (data) => {
setMessages(prev => [
...prev,
{ role: 'user', content: input },
{ role: 'assistant', content: data.content }
]);
setInput('');
}
});
return (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">安全 AI 助手</h2>
<div className="h-96 overflow-y-auto space-y-4 p-4 bg-base-200 rounded">
{messages.map((msg, i) => (
<div key={i} className={`chat chat-${msg.role === 'user' ? 'end' : 'start'}`}>
<div className={`chat-bubble ${msg.role === 'user' ? 'chat-bubble-primary' : ''}`}>
{msg.content}
</div>
</div>
))}
</div>
{sendMessage.isError && (
<div className="alert alert-error">
<span>{sendMessage.error?.message}</span>
</div>
)}
<div className="join w-full">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && sendMessage.mutate(input)}
placeholder="安全输入..."
className="input input-bordered join-item flex-1"
disabled={sendMessage.isPending}
/>
<button
onClick={() => sendMessage.mutate(input)}
className="btn btn-primary join-item"
disabled={sendMessage.isPending || !input.trim()}
>
{sendMessage.isPending ? (
<span className="loading loading-spinner"></span>
) : '发送'}
</button>
</div>
</div>
</div>
);
}
具有内置错误处理的 React 组件,以友好的方式向用户显示安全违规信息。
高级示例:使用 LangGraph 的多层防御
1. 高级内容审核系统
// lib/guards/content-moderator.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { z } from 'zod';
import { memoize, debounce } from 'es-toolkit';
import { StructuredOutputParser } from '@langchain/core/output_parsers';
const ModerationResultSchema = z.object({
safe: z.boolean(),
categories: z.object({
toxic: z.number().min(0).max(1),
violent: z.number().min(0).max(1),
sexual: z.number().min(0).max(1),
harmful: z.number().min(0).max(1),
pii: z.number().min(0).max(1)
}),
explanation: z.string(),
suggestedAction: z.enum(['allow', 'block', 'review', 'redact'])
});
export class ContentModerator {
private model: ChatGoogleGenerativeAI;
private parser: StructuredOutputParser<z.infer<typeof ModerationResultSchema>>;
private cache = new Map<string, z.infer<typeof ModerationResultSchema>>();
constructor() {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
maxOutputTokens: 500
});
this.parser = StructuredOutputParser.fromZodSchema(ModerationResultSchema);
}
// Memoized moderation for performance
moderate = memoize(async (content: string) => {
if (this.cache.has(content)) {
return this.cache.get(content)!;
}
const prompt = `
Analyze this content for safety issues:
"${content}"
${this.parser.getFormatInstructions()}
`;
const response = await this.model.invoke(prompt);
const result = await this.parser.parse(response.content as string);
this.cache.set(content, result);
return result;
}, {
getCacheKey: (content) => content.substring(0, 100) // Cache by first 100 chars
});
async moderateWithFallback(content: string): Promise<z.infer<typeof ModerationResultSchema>> {
try {
return await this.moderate(content);
} catch (error) {
console.error('Moderation failed, using fallback:', error);
// Fallback to basic checks
return {
safe: !this.hasObviousIssues(content),
categories: {
toxic: 0,
violent: 0,
sexual: 0,
harmful: 0,
pii: this.detectPII(content) ? 1 : 0
},
explanation: 'Fallback moderation',
suggestedAction: 'review'
};
}
}
private hasObviousIssues(content: string): boolean {
const issues = [
/\b(kill|murder|die)\b/i,
/\b(hate|racist|sexist)\b/i
];
return issues.some(pattern => pattern.test(content));
}
private detectPII(content: string): boolean {
const piiPatterns = [
/\b\d{3}-\d{2}-\d{4}\b/, // SSN
/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/, // Email
/\b\d{16}\b/, // Credit card
/\b\d{3}[-.]?\d{3}[-.]?\d{4}\b/ // Phone
];
return piiPatterns.some(pattern => pattern.test(content));
}
}
使用基于 LLM 的分析并有回退到正则表达式模式的复杂内容审核系统,确保可靠性。
2. 使用 LangGraph 的有状态安全工作流
// lib/workflows/safety-workflow.ts
import { StateGraph, END } from '@langchain/langgraph';
import { BaseMessage, HumanMessage, AIMessage } from '@langchain/core/messages';
import { ContentModerator } from '@/lib/guards/content-moderator';
import { TokenRateLimiter } from '@/lib/guards/rate-limiter';
import { partition, map } from 'es-toolkit';
interface SafetyState {
messages: BaseMessage[];
userId: string;
safetyChecks: {
input: boolean;
rateLimit: boolean;
content: boolean;
output: boolean;
};
violations: string[];
finalResponse?: string;
}
export function createSafetyWorkflow() {
const moderator = new ContentModerator();
const rateLimiter = new TokenRateLimiter();
const workflow = new StateGraph<SafetyState>({
channels: {
messages: {
value: (x: BaseMessage[], y: BaseMessage[]) => [...x, ...y],
default: () => []
},
userId: {
value: (x: string, y: string) => y || x,
default: () => 'anonymous'
},
safetyChecks: {
value: (x, y) => ({...x, ...y}),
default: () => ({
input: false,
rateLimit: false,
content: false,
output: false
})
},
violations: {
value: (x: string[], y: string[]) => [...x, ...y],
default: () => []
},
finalResponse: {
value: (x: string | undefined, y: string | undefined) => y || x,
default: () => undefined
}
}
});
// Input validation node
workflow.addNode('validateInput', async (state) => {
const lastMessage = state.messages[state.messages.length - 1];
const moderation = await moderator.moderateWithFallback(lastMessage.content as string);
if (!moderation.safe) {
return {
safetyChecks: { ...state.safetyChecks, input: false },
violations: [`Input violation: ${moderation.explanation}`]
};
}
return {
safetyChecks: { ...state.safetyChecks, input: true }
};
});
// Rate limiting node
workflow.addNode('checkRateLimit', async (state) => {
try {
const estimatedTokens = (state.messages[state.messages.length - 1].content as string).length / 4;
await rateLimiter.checkLimit(state.userId, estimatedTokens);
return {
safetyChecks: { ...state.safetyChecks, rateLimit: true }
};
} catch (error) {
return {
safetyChecks: { ...state.safetyChecks, rateLimit: false },
violations: [`Rate limit: ${error.message}`]
};
}
});
// Process with LLM node
workflow.addNode('processLLM', async (state) => {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0.3
});
const response = await model.invoke(state.messages);
return {
messages: [response],
finalResponse: response.content as string
};
});
// Output validation node
workflow.addNode('validateOutput', async (state) => {
if (!state.finalResponse) {
return {
safetyChecks: { ...state.safetyChecks, output: false },
violations: ['No output generated']
};
}
const moderation = await moderator.moderateWithFallback(state.finalResponse);
if (!moderation.safe) {
return {
safetyChecks: { ...state.safetyChecks, output: false },
violations: [`Output violation: ${moderation.explanation}`],
finalResponse: '由于安全考虑,我无法提供该响应。'
};
}
return {
safetyChecks: { ...state.safetyChecks, output: true }
};
});
// Safety violation handler
workflow.addNode('handleViolation', async (state) => {
console.error('Safety violations:', state.violations);
return {
finalResponse: '由于安全策略,您的请求无法处理。',
messages: [new AIMessage('由于安全原因,请求被阻止')]
};
});
// Conditional routing
workflow.addConditionalEdges('validateInput', (state) => {
return state.safetyChecks.input ? 'checkRateLimit' : 'handleViolation';
});
workflow.addConditionalEdges('checkRateLimit', (state) => {
return state.safetyChecks.rateLimit ? 'processLLM' : 'handleViolation';
});
workflow.addEdge('processLLM', 'validateOutput');
workflow.addConditionalEdges('validateOutput', (state) => {
return state.safetyChecks.output ? END : 'handleViolation';
});
workflow.addEdge('handleViolation', END);
workflow.setEntryPoint('validateInput');
return workflow.compile();
}
编排多个验证阶段的完整安全工作流,具有适当的错误处理和违规跟踪。
3. 人工参与的安全审查
// lib/guards/human-review.ts
import { z } from 'zod';
import { throttle } from 'es-toolkit';
const ReviewDecisionSchema = z.object({
approved: z.boolean(),
reason: z.string().optional(),
modifications: z.string().optional()
});
export class HumanReviewSystem {
private pendingReviews = new Map<string, {
content: string;
resolve: (decision: z.infer<typeof ReviewDecisionSchema>) => void;
timestamp: Date;
}>();
async requestReview(
reviewId: string,
content: string,
context: Record<string, any>
): Promise<z.infer<typeof ReviewDecisionSchema>> {
return new Promise((resolve) => {
this.pendingReviews.set(reviewId, {
content,
resolve,
timestamp: new Date()
});
// Notify reviewers (webhook, email, etc.)
this.notifyReviewers(reviewId, content, context);
// Auto-reject after timeout
setTimeout(() => {
if (this.pendingReviews.has(reviewId)) {
this.completeReview(reviewId, {
approved: false,
reason: 'Review timeout'
});
}
}, 30000); // 30 second timeout
});
}
completeReview(
reviewId: string,
decision: z.infer<typeof ReviewDecisionSchema>
) {
const review = this.pendingReviews.get(reviewId);
if (review) {
review.resolve(decision);
this.pendingReviews.delete(reviewId);
}
}
private notifyReviewers = throttle(
async (reviewId: string, content: string, context: Record<string, any>) => {
// Send to review dashboard
await fetch(process.env.REVIEW_WEBHOOK_URL!, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ reviewId, content, context })
});
},
1000 // Throttle notifications to 1 per second
);
getPendingReviews() {
return Array.from(this.pendingReviews.entries()).map(([id, review]) => ({
id,
content: review.content,
timestamp: review.timestamp
}));
}
}
针对高风险内容的人工审查系统,具有自动超时和通知机制。
4. 完整安全管道的 API 路由
// app/api/safe-agent/route.ts
import { createSafetyWorkflow } from '@/lib/workflows/safety-workflow';
import { HumanMessage } from '@langchain/core/messages';
import { HumanReviewSystem } from '@/lib/guards/human-review';
import { NextResponse } from 'next/server';
export const runtime = 'nodejs';
export const maxDuration = 300;
const reviewSystem = new HumanReviewSystem();
export async function POST(req: Request) {
try {
const { message, userId, sessionId } = await req.json();
const workflow = createSafetyWorkflow();
// Run safety workflow
const result = await workflow.invoke({
messages: [new HumanMessage(message)],
userId,
safetyChecks: {
input: false,
rateLimit: false,
content: false,
output: false
},
violations: []
});
// Check if human review needed
const needsReview = result.violations.length > 0 &&
result.violations.some(v => v.includes('review'));
if (needsReview) {
const reviewId = `${sessionId}-${Date.now()}`;
const decision = await reviewSystem.requestReview(
reviewId,
result.finalResponse || message,
{ userId, violations: result.violations }
);
if (!decision.approved) {
return NextResponse.json({
content: '内容需要审查但未获得批准。',
reviewId
});
}
result.finalResponse = decision.modifications || result.finalResponse;
}
return NextResponse.json({
content: result.finalResponse,
safetyChecks: result.safetyChecks,
violations: result.violations
});
} catch (error) {
console.error('Agent error:', error);
return NextResponse.json(
{ error: 'Processing failed', details: error.message },
{ status: 500 }
);
}
}
集成所有安全层并为敏感内容提供人工审查升级的完整 API 路由。
5. 监控仪表板组件
// components/SafetyMonitorDashboard.tsx
'use client';
import { useState, useEffect } from 'react';
import { useQuery } from '@tanstack/react-query';
import { groupBy, map, filter } from 'es-toolkit';
interface SafetyMetric {
timestamp: Date;
type: 'input' | 'output' | 'rate_limit';
blocked: boolean;
userId: string;
}
export default function SafetyMonitorDashboard() {
const [metrics, setMetrics] = useState<SafetyMetric[]>([]);
const { data: pendingReviews } = useQuery({
queryKey: ['pending-reviews'],
queryFn: async () => {
const res = await fetch('/api/admin/pending-reviews');
return res.json();
},
refetchInterval: 5000
});
const { data: recentViolations } = useQuery({
queryKey: ['violations'],
queryFn: async () => {
const res = await fetch('/api/admin/violations');
return res.json();
},
refetchInterval: 10000
});
const violationsByType = groupBy(
recentViolations || [],
'type'
);
return (
<div className="p-6 space-y-6">
<h1 className="text-3xl font-bold">安全监控仪表板</h1>
{/* Metrics Grid */}
<div className="stats shadow w-full">
<div className="stat">
<div className="stat-figure text-primary">
<svg className="w-8 h-8" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M9 12l2 2 4-4m6 2a9 9 0 11-18 0 9 9 0 0118 0z" />
</svg>
</div>
<div className="stat-title">安全评分</div>
<div className="stat-value text-primary">98.5%</div>
<div className="stat-desc">过去 24 小时</div>
</div>
<div className="stat">
<div className="stat-figure text-secondary">
<svg className="w-8 h-8" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z" />
</svg>
</div>
<div className="stat-title">被阻止的请求</div>
<div className="stat-value text-secondary">{recentViolations?.length || 0}</div>
<div className="stat-desc">↗︎ 3 (2%)</div>
</div>
<div className="stat">
<div className="stat-figure text-warning">
<svg className="w-8 h-8" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M12 8v4l3 3m6-3a9 9 0 11-18 0 9 9 0 0118 0z" />
</svg>
</div>
<div className="stat-title">待审查项目</div>
<div className="stat-value text-warning">{pendingReviews?.length || 0}</div>
<div className="stat-desc">平均响应:45秒</div>
</div>
</div>
{/* Pending Reviews Table */}
{pendingReviews?.length > 0 && (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">待人工审查</h2>
<div className="overflow-x-auto">
<table className="table">
<thead>
<tr>
<th>ID</th>
<th>内容预览</th>
<th>等待时间</th>
<th>操作</th>
</tr>
</thead>
<tbody>
{pendingReviews.map((review: any) => (
<tr key={review.id}>
<td>{review.id.substring(0, 8)}...</td>
<td className="max-w-xs truncate">{review.content}</td>
<td>{Math.round((Date.now() - new Date(review.timestamp).getTime()) / 1000)}秒</td>
<td>
<div className="btn-group">
<button className="btn btn-sm btn-success">批准</button>
<button className="btn btn-sm btn-error">拒绝</button>
</div>
</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
</div>
)}
{/* Recent Violations */}
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">按类型分类的近期违规</h2>
<div className="grid grid-cols-2 gap-4">
{Object.entries(violationsByType).map(([type, violations]) => (
<div key={type} className="stat bg-base-200 rounded-box">
<div className="stat-title capitalize">{type}</div>
<div className="stat-value text-2xl">{violations.length}</div>
<div className="stat-desc">
{violations.slice(0, 2).map((v: any) => (
<div key={v.id} className="text-xs truncate">
{v.reason}
</div>
))}
</div>
</div>
))}
</div>
</div>
</div>
</div>
);
}
实时监控仪表板,显示安全指标、待审查项目和违规趋势。
结论
为 LLM 智能体实施防护和安全模式需要多层方法,结合输入验证、内容审核、速率限制和人工监督。基础模式通过最小的延迟影响提供快速收益,而使用 LangGraph 的高级工作流能够实现复杂的安全编排。关键要点包括使用记忆化提高性能、实施回退机制确保可靠性,以及维护全面的审计跟踪以满足合规要求。请记住,安全不是一次性实施,而是一个持续的过程,需要随着新攻击向量的出现不断监控、调整和改进。