草案 智能体设计模式 - 工具使用
工具使用(函数调用)使AI智能体能够与外部系统、API和数据库进行交互,将它们从被动响应者转变为主动执行者。本指南演示了如何在Vercel的无服务器平台上使用LangChain、LangGraph和Next.js 15实现生产就绪的工具使用模式。
心智模型:AI作为操作系统
将工具使用想象成操作系统中的系统调用。就像程序使用系统调用与硬件和资源交互一样,AI智能体使用工具与外部系统交互。LLM充当"内核",根据用户意图决定调用哪些工具。LangChain提供标准化工具接口的"驱动层",而LangGraph管理复杂多工具工作流的"进程调度"。这种架构使智能体能够执行数据库查询、API调用和计算等操作 - 类似于操作系统协调文件I/O、网络请求和CPU操作的方式。
基础工具实现
1. 使用Zod模式定义工具
// lib/tools/weather-tool.ts
import { z } from 'zod';
import { tool } from '@langchain/core/tools';
import { get } from 'es-toolkit/object';
import { isString } from 'es-toolkit/predicate';
export const weatherTool = tool(
async ({ location, unit = 'celsius' }) => {
// 模拟API调用 - 替换为真实的天气API
const response = await fetch(
`https://api.weather.example/current?q=${location}&units=${unit}`
);
const data = await response.json();
// 使用es-toolkit进行安全的数据访问
const temperature = get(data, 'main.temp');
const description = get(data, 'weather[0].description');
if (!isString(description)) {
throw new Error('接收到无效的天气数据');
}
return `${location}的当前天气:${temperature}°${
unit === 'celsius' ? 'C' : 'F'
}, ${description}`;
},
{
name: 'get_weather',
description: '获取位置的当前天气',
schema: z.object({
location: z.string().describe('城市名称或位置'),
unit: z.enum(['celsius', 'fahrenheit']).optional(),
}),
}
);
使用Zod模式验证定义天气工具,确保输入参数和TypeScript集成的类型安全。
2. 创建计算器工具
// lib/tools/calculator-tool.ts
import { z } from 'zod';
import { tool } from '@langchain/core/tools';
import { evaluate } from 'es-toolkit/math';
import { attempt } from 'es-toolkit/function';
export const calculatorTool = tool(
async ({ expression }) => {
// 使用es-toolkit的attempt进行安全执行
const result = attempt(() => {
// 生产环境中,使用像mathjs这样的适当数学解析器
return evaluate(expression);
});
if (result instanceof Error) {
return `错误:无效表达式"${expression}"`;
}
return `结果:${result}`;
},
{
name: 'calculator',
description: '执行数学计算',
schema: z.object({
expression: z.string().describe('要计算的数学表达式'),
}),
}
);
使用es-toolkit的attempt模式实现带有错误处理的计算器工具,确保安全执行。
3. 数据库查询工具
// lib/tools/database-tool.ts
import { z } from 'zod';
import { tool } from '@langchain/core/tools';
import { sql } from '@vercel/postgres';
import { mapValues, pick } from 'es-toolkit/object';
import { chunk } from 'es-toolkit/array';
export const databaseTool = tool(
async ({ query, parameters = {} }) => {
try {
// 清理和验证查询
if (query.toLowerCase().includes('drop') ||
query.toLowerCase().includes('delete')) {
return '错误:不允许破坏性操作';
}
// 使用参数执行查询
const result = await sql.query(query, Object.values(parameters));
// 为大型数据集处理结果分块
const rows = result.rows;
const chunks = chunk(rows, 100);
// 为无服务器内存限制返回第一个块
return JSON.stringify({
rowCount: result.rowCount,
data: chunks[0] || [],
hasMore: chunks.length > 1,
});
} catch (error) {
return `数据库错误:${error.message}`;
}
},
{
name: 'query_database',
description: '在数据库上执行安全的SELECT查询',
schema: z.object({
query: z.string().describe('SQL SELECT查询'),
parameters: z.record(z.any()).optional().describe('查询参数'),
}),
}
);
创建包含安全检查和分块结果处理的数据库工具,针对无服务器内存约束进行了优化。
4. 将工具绑定到LLM
// lib/agents/tool-agent.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { weatherTool } from '@/lib/tools/weather-tool';
import { calculatorTool } from '@/lib/tools/calculator-tool';
import { databaseTool } from '@/lib/tools/database-tool';
import { pipe } from 'es-toolkit/function';
export function createToolAgent() {
const model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0,
streaming: true,
});
const tools = [weatherTool, calculatorTool, databaseTool];
// 将工具绑定到模型
const modelWithTools = model.bindTools(tools);
return {
model: modelWithTools,
tools,
// 格式化工具调用的辅助函数
formatToolCall: pipe(
(call: any) => call.args,
JSON.stringify
),
};
}
将多个工具绑定到LLM,使其能够根据用户查询决定使用哪些工具。
5. 带工具执行的API路由
// app/api/tools/route.ts
import { NextResponse } from 'next/server';
import { createToolAgent } from '@/lib/agents/tool-agent';
import { HumanMessage } from '@langchain/core/messages';
import { debounce } from 'es-toolkit/function';
export const runtime = 'nodejs';
export const maxDuration = 777; // 800秒限制的安全缓冲
// 防抖并发请求
const processRequest = debounce(async (message: string) => {
const { model, tools } = createToolAgent();
// 获取包含工具调用的LLM响应
const response = await model.invoke([
new HumanMessage(message)
]);
// 如果存在工具调用则执行
if (response.tool_calls && response.tool_calls.length > 0) {
const toolResults = await Promise.all(
response.tool_calls.map(async (toolCall) => {
const tool = tools.find(t => t.name === toolCall.name);
if (!tool) return null;
const result = await tool.invoke(toolCall.args);
return {
tool: toolCall.name,
result,
};
})
);
return { toolResults, message: response.content };
}
return { message: response.content };
}, 100);
export async function POST(req: Request) {
try {
const { message } = await req.json();
const result = await processRequest(message);
return NextResponse.json(result);
} catch (error) {
return NextResponse.json(
{ error: '工具执行失败' },
{ status: 500 }
);
}
}
实现处理消息、执行工具调用并返回结果的无服务器API路由,带有速率限制的防抖功能。
高级工具编排
1. 并行工具执行
// lib/orchestration/parallel-tools.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { RunnableParallel } from '@langchain/core/runnables';
import { groupBy } from 'es-toolkit/array';
import { mapValues } from 'es-toolkit/object';
export class ParallelToolOrchestrator {
private model: ChatGoogleGenerativeAI;
private tools: Map<string, any>;
constructor(tools: any[]) {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-pro',
temperature: 0,
});
this.tools = new Map(tools.map(t => [t.name, t]));
this.model = this.model.bindTools(tools);
}
async executeParallel(message: string) {
// 从LLM获取工具调用
const response = await this.model.invoke([
{ role: 'user', content: message }
]);
if (!response.tool_calls?.length) {
return { message: response.content };
}
// 按依赖性分组工具(独立工具可以并行运行)
const toolGroups = this.groupIndependentTools(response.tool_calls);
// 并行执行每个组
const results = [];
for (const group of toolGroups) {
const groupResults = await Promise.all(
group.map(async (call) => {
const tool = this.tools.get(call.name);
if (!tool) return null;
const startTime = Date.now();
const result = await tool.invoke(call.args);
const duration = Date.now() - startTime;
return {
tool: call.name,
result,
duration,
timestamp: new Date().toISOString(),
};
})
);
results.push(...groupResults.filter(Boolean));
}
return {
message: response.content,
toolResults: results,
parallelGroups: toolGroups.length,
totalDuration: Math.max(...results.map(r => r.duration)),
};
}
private groupIndependentTools(toolCalls: any[]) {
// 简单启发式:没有共享参数的工具可以并行运行
const groups: any[][] = [];
const used = new Set<number>();
toolCalls.forEach((call, i) => {
if (used.has(i)) return;
const group = [call];
used.add(i);
// 查找其他独立工具
toolCalls.forEach((other, j) => {
if (i !== j && !used.has(j) && this.areIndependent(call, other)) {
group.push(other);
used.add(j);
}
});
groups.push(group);
});
return groups;
}
private areIndependent(tool1: any, tool2: any): boolean {
// 如果工具不共享数据依赖关系,则它们是独立的
const args1 = JSON.stringify(tool1.args);
const args2 = JSON.stringify(tool2.args);
// 简单检查:没有共享值
return !Object.values(tool1.args).some(v =>
args2.includes(String(v))
);
}
}
实现带有依赖性分析的并行工具执行,以最大化无服务器环境中的吞吐量。
2. 带错误恢复的工具链
// lib/orchestration/tool-chain.ts
import { StateGraph, END } from '@langchain/langgraph';
import { BaseMessage } from '@langchain/core/messages';
import { retry, delay } from 'es-toolkit/promise';
import { pipe } from 'es-toolkit/function';
interface ChainState {
messages: BaseMessage[];
toolResults: any[];
errors: any[];
retryCount: number;
}
export function createToolChain(tools: any[]) {
const graph = new StateGraph<ChainState>({
channels: {
messages: {
value: (x, y) => [...x, ...y],
default: () => [],
},
toolResults: {
value: (x, y) => [...x, ...y],
default: () => [],
},
errors: {
value: (x, y) => [...x, ...y],
default: () => [],
},
retryCount: {
value: (x, y) => y,
default: () => 0,
},
},
});
// 使用重试逻辑执行工具
graph.addNode('execute_tool', async (state) => {
const currentTool = tools[state.toolResults.length];
if (!currentTool) {
return { ...state };
}
try {
const result = await retry(
async () => {
const res = await currentTool.invoke(
state.messages[state.messages.length - 1]
);
return res;
},
{
times: 3,
delay: 1000,
onRetry: (error, attempt) => {
console.log(`${currentTool.name}的重试${attempt}:`, error);
},
}
);
return {
toolResults: [{
tool: currentTool.name,
result,
success: true
}],
};
} catch (error) {
return {
errors: [{
tool: currentTool.name,
error: error.message
}],
retryCount: state.retryCount + 1,
};
}
});
// 错误的回退节点
graph.addNode('handle_error', async (state) => {
const lastError = state.errors[state.errors.length - 1];
// 尝试替代工具或优雅降级
const fallbackResult = {
tool: 'fallback',
result: `执行${lastError.tool}失败。使用缓存结果。`,
fallback: true,
};
return {
toolResults: [fallbackResult],
};
});
// 决策节点
graph.addNode('check_status', async (state) => {
if (state.errors.length > 0 && state.retryCount < 3) {
return 'handle_error';
}
if (state.toolResults.length < tools.length) {
return 'execute_tool';
}
return END;
});
// 定义边
graph.setEntryPoint('execute_tool');
graph.addEdge('execute_tool', 'check_status');
graph.addEdge('handle_error', 'execute_tool');
graph.addConditionalEdges('check_status', async (state) => {
if (state.errors.length > 0 && state.retryCount < 3) {
return 'handle_error';
}
if (state.toolResults.length < tools.length) {
return 'execute_tool';
}
return END;
});
return graph.compile();
}
创建具有自动重试逻辑和回退机制的弹性工具链,以提高生产可靠性。
3. 动态工具选择
// lib/orchestration/dynamic-tools.ts
import { ChatGoogleGenerativeAI } from '@langchain/google-genai';
import { z } from 'zod';
import { tool } from '@langchain/core/tools';
import { maxBy, minBy } from 'es-toolkit/array';
import { memoize } from 'es-toolkit/function';
export class DynamicToolSelector {
private model: ChatGoogleGenerativeAI;
private toolRegistry: Map<string, any>;
private performanceMetrics: Map<string, any>;
constructor() {
this.model = new ChatGoogleGenerativeAI({
modelName: 'gemini-2.5-flash',
temperature: 0,
});
this.toolRegistry = new Map();
this.performanceMetrics = new Map();
}
// 使用元数据注册工具
registerTool(toolDef: any, metadata: {
cost: number;
latency: number;
reliability: number;
capabilities: string[];
}) {
this.toolRegistry.set(toolDef.name, {
tool: toolDef,
metadata,
});
}
// 为性能进行记忆化工具选择
private selectOptimalTool = memoize(
async (requirement: string, constraints: any) => {
const availableTools = Array.from(this.toolRegistry.values());
// 根据需求为每个工具评分
const scoredTools = await Promise.all(
availableTools.map(async (entry) => {
const score = this.calculateToolScore(
entry,
requirement,
constraints
);
return { ...entry, score };
})
);
// 选择最佳工具
const bestTool = maxBy(scoredTools, t => t.score);
return bestTool?.tool;
},
{
getCacheKey: (req, cons) => `${req}-${JSON.stringify(cons)}`,
}
);
private calculateToolScore(
entry: any,
requirement: string,
constraints: any
): number {
const { metadata } = entry;
let score = 0;
// 匹配能力
const capabilityMatch = metadata.capabilities.some((cap: string) =>
requirement.toLowerCase().includes(cap.toLowerCase())
);
if (capabilityMatch) score += 40;
// 考虑约束
if (constraints.maxLatency && metadata.latency <= constraints.maxLatency) {
score += 20;
}
if (constraints.maxCost && metadata.cost <= constraints.maxCost) {
score += 20;
}
// 可靠性奖励
score += metadata.reliability * 20;
// 历史性能
const metrics = this.performanceMetrics.get(entry.tool.name);
if (metrics?.successRate) {
score += metrics.successRate * 10;
}
return score;
}
async executeDynamic(
requirement: string,
constraints: {
maxLatency?: number;
maxCost?: number;
preferredTools?: string[];
} = {}
) {
// 选择最优工具
const selectedTool = await this.selectOptimalTool(
requirement,
constraints
);
if (!selectedTool) {
throw new Error('找不到适合需求的工具');
}
const startTime = Date.now();
try {
// 执行所选工具
const result = await selectedTool.invoke({ query: requirement });
// 更新性能指标
this.updateMetrics(selectedTool.name, {
success: true,
latency: Date.now() - startTime,
});
return {
tool: selectedTool.name,
result,
metrics: {
latency: Date.now() - startTime,
cost: this.toolRegistry.get(selectedTool.name)?.metadata.cost,
},
};
} catch (error) {
// 更新失败指标
this.updateMetrics(selectedTool.name, {
success: false,
error: error.message,
});
throw error;
}
}
private updateMetrics(toolName: string, metrics: any) {
const existing = this.performanceMetrics.get(toolName) || {
totalCalls: 0,
successfulCalls: 0,
totalLatency: 0,
};
existing.totalCalls++;
if (metrics.success) {
existing.successfulCalls++;
existing.totalLatency += metrics.latency;
}
existing.successRate = existing.successfulCalls / existing.totalCalls;
existing.avgLatency = existing.totalLatency / existing.successfulCalls;
this.performanceMetrics.set(toolName, existing);
}
}
基于需求、约束和历史性能指标实现动态工具选择。
4. 流式工具结果
// app/api/stream-tools/route.ts
import { createToolAgent } from '@/lib/agents/tool-agent';
import { ParallelToolOrchestrator } from '@/lib/orchestration/parallel-tools';
export const runtime = 'nodejs';
export const maxDuration = 777;
export async function POST(req: Request) {
const { message } = await req.json();
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const { model, tools } = createToolAgent();
const orchestrator = new ParallelToolOrchestrator(tools);
// 在后台处理
(async () => {
try {
// 流式传输初始思考
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'thinking',
content: '分析请求并选择工具...'
})}\n\n`)
);
// 从LLM获取工具调用
const response = await model.invoke([
{ role: 'user', content: message }
]);
if (response.tool_calls) {
// 流式传输工具执行更新
for (const call of response.tool_calls) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'tool_start',
tool: call.name,
args: call.args
})}\n\n`)
);
const tool = tools.find(t => t.name === call.name);
if (tool) {
const result = await tool.invoke(call.args);
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'tool_complete',
tool: call.name,
result
})}\n\n`)
);
}
}
}
// 流式传输最终响应
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'complete',
content: response.content
})}\n\n`)
);
} catch (error) {
await writer.write(
encoder.encode(`data: ${JSON.stringify({
type: 'error',
error: error.message
})}\n\n`)
);
} finally {
await writer.close();
}
})();
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
实现服务器发送事件以实时流式传输工具执行进度。
5. 使用TanStack Query的前端集成
// components/ToolInterface.tsx
'use client';
import { useMutation } from '@tanstack/react-query';
import { useState, useCallback } from 'react';
import { debounce } from 'es-toolkit/function';
interface ToolResult {
tool: string;
result: any;
duration?: number;
}
export default function ToolInterface() {
const [input, setInput] = useState('');
const [streamedResults, setStreamedResults] = useState<ToolResult[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const streamTools = useCallback(
debounce(async (message: string) => {
setIsStreaming(true);
setStreamedResults([]);
const eventSource = new EventSource(
`/api/stream-tools?message=${encodeURIComponent(message)}`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'tool_start':
setStreamedResults(prev => [...prev, {
tool: data.tool,
result: '执行中...',
}]);
break;
case 'tool_complete':
setStreamedResults(prev =>
prev.map(r =>
r.tool === data.tool
? { ...r, result: data.result }
: r
)
);
break;
case 'complete':
setIsStreaming(false);
eventSource.close();
break;
case 'error':
console.error('流错误:', data.error);
setIsStreaming(false);
eventSource.close();
break;
}
};
return () => eventSource.close();
}, 300),
[]
);
const executeTool = useMutation({
mutationFn: async (message: string) => {
const response = await fetch('/api/tools', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
return response.json();
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (input.trim()) {
streamTools(input);
}
};
return (
<div className="card bg-base-100 shadow-xl">
<div className="card-body">
<h2 className="card-title">AI工具执行器</h2>
<form onSubmit={handleSubmit}>
<div className="form-control">
<label className="label">
<span className="label-text">我能为您做什么?</span>
</label>
<textarea
className="textarea textarea-bordered h-24"
placeholder="计算144的平方根,然后检查东京的天气"
value={input}
onChange={(e) => setInput(e.target.value)}
/>
</div>
<div className="form-control mt-4">
<button
type="submit"
className="btn btn-primary"
disabled={isStreaming || !input.trim()}
>
{isStreaming ? (
<>
<span className="loading loading-spinner"></span>
执行工具中...
</>
) : '执行'}
</button>
</div>
</form>
{streamedResults.length > 0 && (
<div className="mt-6">
<h3 className="text-lg font-semibold mb-3">工具结果:</h3>
<div className="space-y-3">
{streamedResults.map((result, idx) => (
<div key={idx} className="alert alert-info">
<div>
<span className="font-bold">{result.tool}:</span>
<pre className="mt-2 text-sm">{
typeof result.result === 'object'
? JSON.stringify(result.result, null, 2)
: result.result
}</pre>
</div>
</div>
))}
</div>
</div>
)}
</div>
</div>
);
}
使用服务器发送事件实时流式传输工具执行结果的React组件。
6. 工具监控仪表板
// app/tools/dashboard/page.tsx
'use client';
import { useQuery } from '@tanstack/react-query';
import { groupBy } from 'es-toolkit/array';
import { useState, useEffect } from 'react';
interface ToolMetrics {
name: string;
calls: number;
avgLatency: number;
successRate: number;
lastUsed: string;
}
export default function ToolDashboard() {
const [metrics, setMetrics] = useState<ToolMetrics[]>([]);
const { data: liveMetrics } = useQuery({
queryKey: ['tool-metrics'],
queryFn: async () => {
const res = await fetch('/api/tools/metrics');
return res.json();
},
refetchInterval: 5000, // 每5秒轮询一次
});
useEffect(() => {
if (liveMetrics) {
setMetrics(liveMetrics);
}
}, [liveMetrics]);
const totalCalls = metrics.reduce((sum, m) => sum + m.calls, 0);
const avgSuccessRate = metrics.length > 0
? metrics.reduce((sum, m) => sum + m.successRate, 0) / metrics.length
: 0;
return (
<div className="container mx-auto p-6">
<h1 className="text-4xl font-bold mb-8">工具监控仪表板</h1>
<div className="stats shadow mb-8">
<div className="stat">
<div className="stat-title">工具调用总数</div>
<div className="stat-value">{totalCalls}</div>
<div className="stat-desc">所有工具</div>
</div>
<div className="stat">
<div className="stat-title">平均成功率</div>
<div className="stat-value">{(avgSuccessRate * 100).toFixed(1)}%</div>
<div className="stat-desc">系统可靠性</div>
</div>
<div className="stat">
<div className="stat-title">活动工具</div>
<div className="stat-value">{metrics.length}</div>
<div className="stat-desc">当前已注册</div>
</div>
</div>
<div className="overflow-x-auto">
<table className="table table-zebra w-full">
<thead>
<tr>
<th>工具名称</th>
<th>调用次数</th>
<th>平均延迟</th>
<th>成功率</th>
<th>最后使用</th>
<th>状态</th>
</tr>
</thead>
<tbody>
{metrics.map((metric) => (
<tr key={metric.name}>
<td className="font-medium">{metric.name}</td>
<td>{metric.calls}</td>
<td>{metric.avgLatency.toFixed(0)}毫秒</td>
<td>
<div className="flex items-center gap-2">
<progress
className="progress progress-success w-20"
value={metric.successRate * 100}
max="100"
/>
<span>{(metric.successRate * 100).toFixed(1)}%</span>
</div>
</td>
<td>{new Date(metric.lastUsed).toLocaleString()}</td>
<td>
<div className={`badge ${
metric.successRate > 0.95 ? 'badge-success' :
metric.successRate > 0.8 ? 'badge-warning' :
'badge-error'
}`}>
{metric.successRate > 0.95 ? '健康' :
metric.successRate > 0.8 ? '降级' :
'危急'}
</div>
</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
);
}
实时监控仪表板,用于跟踪工具性能、成功率和系统健康状况。
结论
工具使用模式将AI智能体从被动响应者转变为能够与外部系统交互的主动执行者。此实现展示了包括并行执行、错误恢复、动态选择和实时流式传输在内的生产就绪模式 - 所有这些都针对Vercel的无服务器平台进行了优化,具有777秒的安全缓冲。LangChain的标准化工具接口、LangGraph的编排功能和es-toolkit的实用函数的组合,为构建能够在生产环境中可靠执行复杂多步骤工作流的复杂AI智能体创建了坚实的基础。