概述
Model Context Protocol (MCP) 是 2026 年 Agent 生态中最关键的标准之一,它为 LLM 和外部工具之间提供了统一的接口。本指南将带你从零开始构建 MCP 服务器,并集成到各种 Agent 框架中。
MCP 核心概念
| 组件 | 说明 | 实现方式 |
|---|---|---|
| Tools | Agent 可以调用的函数 | 定义输入输出和执行逻辑 |
| Resources | Agent 可以访问的数据源 | 提供数据读取接口 |
| Prompts | 预定义的提示词模板 | 管理复杂的提示词结构 |
| Servers | 实现 MCP 协议的服务 | 处理连接和消息路由 |
快速开始
1. 安装 MCP 开发环境
# 安装 MCP CLI
npm install -g @modelcontextprotocol/cli
# 安装 TypeScript SDK
npm install @modelcontextprotocol/sdk
# 初始化项目
npm init @modelcontextprotocol/server
2. 创建第一个 MCP 服务器
// server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
const server = new Server(
{
name: "hello-world-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
// 定义工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "hello",
description: "返回问候语",
inputSchema: {
type: "object",
properties: {
name: {
type: "string",
description: "姓名",
},
},
required: ["name"],
},
},
],
};
});
// 实现工具
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "hello") {
const greeting = `你好,${args.name}!欢迎使用 MCP 开发。`;
return {
content: [
{
type: "text",
text: greeting,
},
],
};
}
throw new Error(`未知工具: ${name}`);
});
// 启动服务器
const transport = new StdioServerTransport();
server.connect(transport);
console.log("MCP 服务器启动成功");
3. 运行服务器
# 运行服务器
npx tsx server.ts
# 或者编译后运行
npm run build
node dist/server.js
实用 MCP 服务器开发
1. 文件系统服务器
// filesystem-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { promises as fs } from "fs";
import path from "path";
const server = new Server(
{
name: "filesystem-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
// 文件操作工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "read_file",
description: "读取文件内容",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "文件路径",
},
},
required: ["path"],
},
},
{
name: "write_file",
description: "写入文件",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "文件路径",
},
content: {
type: "string",
description: "文件内容",
},
},
required: ["path", "content"],
},
},
{
name: "list_directory",
description: "列出目录内容",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "目录路径",
},
},
required: ["path"],
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case "read_file": {
const content = await fs.readFile(args.path, "utf-8");
return {
content: [
{
type: "text",
text: content,
},
],
};
}
case "write_file": {
await fs.writeFile(args.path, args.content, "utf-8");
return {
content: [
{
type: "text",
text: `文件已成功写入: ${args.path}`,
},
],
};
}
case "list_directory": {
const items = await fs.readdir(args.path);
const details = await Promise.all(
items.map(async (item) => {
const fullPath = path.join(args.path, item);
const stats = await fs.stat(fullPath);
return {
name: item,
path: fullPath,
isDirectory: stats.isDirectory(),
size: stats.size,
modified: stats.mtime.toISOString(),
};
})
);
return {
content: [
{
type: "text",
text: JSON.stringify(details, null, 2),
},
],
};
}
default:
throw new Error(`未知工具: ${name}`);
}
} catch (error) {
throw new Error(`操作失败: ${error.message}`);
}
});
const transport = new StdioServerTransport();
server.connect(transport);
2. 数据库 MCP 服务器
// database-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { Database } from "sqlite3";
import { open } from "sqlite";
import { sqlite3 } from "sqlite3";
const server = new Server(
{
name: "database-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
// 初始化数据库连接
let db: Database;
async function initDatabase() {
db = await open({
filename: "./database.db",
driver: sqlite3.Database,
});
// 创建示例表
await db.exec(`
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`);
}
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "query",
description: "执行 SQL 查询",
inputSchema: {
type: "object",
properties: {
sql: {
type: "string",
description: "SQL 查询语句",
},
params: {
type: "array",
items: {
type: "string",
},
description: "查询参数",
},
},
required: ["sql"],
},
},
{
name: "insert",
description: "插入数据",
inputSchema: {
type: "object",
properties: {
table: {
type: "string",
description: "表名",
},
data: {
type: "object",
description: "要插入的数据",
additionalProperties: {
type: "string",
},
},
},
required: ["table", "data"],
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (!db) {
await initDatabase();
}
try {
switch (name) {
case "query": {
const stmt = await db.prepare(args.sql);
const result = args.params
? await stmt.all(...args.params)
: await stmt.all();
return {
content: [
{
type: "text",
text: JSON.stringify(result, null, 2),
},
],
};
}
case "insert": {
const columns = Object.keys(args.data);
const placeholders = columns.map(() => "?").join(", ");
const values = Object.values(args.data);
const sql = `INSERT INTO ${args.table} (${columns.join(", ")}) VALUES (${placeholders})`;
await db.run(sql, values);
return {
content: [
{
type: "text",
text: `数据已插入到 ${args.table}`,
},
],
};
}
default:
throw new Error(`未知工具: ${name}`);
}
} catch (error) {
throw new Error(`数据库操作失败: ${error.message}`);
}
});
const transport = new StdioServerTransport();
server.connect(transport);
3. GitHub 集成服务器
// github-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { Octokit } from "@octokit/rest";
const server = new Server(
{
name: "github-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
const octokit = new Octokit({
auth: process.env.GITHUB_TOKEN,
});
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "get_issue",
description: "获取 Issue 详情",
inputSchema: {
type: "object",
properties: {
owner: {
type: "string",
description: "仓库所有者",
},
repo: {
type: "string",
description: "仓库名称",
},
issue_number: {
type: "number",
description: "Issue 编号",
},
},
required: ["owner", "repo", "issue_number"],
},
},
{
name: "create_issue",
description: "创建新 Issue",
inputSchema: {
type: "object",
properties: {
owner: {
type: "string",
description: "仓库所有者",
},
repo: {
type: "string",
description: "仓库名称",
},
title: {
type: "string",
description: "Issue 标题",
},
body: {
type: "string",
description: "Issue 内容",
},
},
required: ["owner", "repo", "title"],
},
},
{
name: "list_pull_requests",
description: "列出 Pull Requests",
inputSchema: {
type: "object",
properties: {
owner: {
type: "string",
description: "仓库所有者",
},
repo: {
type: "string",
description: "仓库名称",
},
state: {
type: "string",
enum: ["open", "closed", "all"],
default: "open",
description: "PR 状态",
},
},
required: ["owner", "repo"],
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case "get_issue": {
const issue = await octokit.rest.issues.get({
owner: args.owner,
repo: args.repo,
issue_number: args.issue_number,
});
return {
content: [
{
type: "text",
text: JSON.stringify(issue.data, null, 2),
},
],
};
}
case "create_issue": {
const newIssue = await octokit.rest.issues.create({
owner: args.owner,
repo: args.repo,
title: args.title,
body: args.body || "",
});
return {
content: [
{
type: "text",
text: `Issue 已创建: #${newIssue.data.number}`,
},
],
};
}
case "list_pull_requests": {
const prs = await octokit.rest.pulls.list({
owner: args.owner,
repo: args.repo,
state: args.state,
});
return {
content: [
{
type: "text",
text: JSON.stringify(prs.data, null, 2),
},
],
};
}
default:
throw new Error(`未知工具: ${name}`);
}
} catch (error) {
throw new Error(`GitHub API 调用失败: ${error.message}`);
}
});
const transport = new StdioServerTransport();
server.connect(transport);
集成到 Agent 框架
1. 集成到 Claude Code
# 添加 GitHub MCP 服务器
claude mcp add github npx @modelcontextprotocol/server-github
# 添加文件系统 MCP 服务器
claude mcp add filesystem npx @modelcontextprotocol/server-filesystem
# 添加数据库 MCP 服务器
claude mcp add database node your-database-server.js
2. 集成到 LangChain
# langchain_integration.py
from langchain.agents import Tool, AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# 定义 MCP 工具
mcp_tools = [
Tool(
name="read_file",
func=read_file_from_mcp,
description="读取文件内容"
),
Tool(
name="write_file",
func=write_file_to_mcp,
description="写入文件"
),
Tool(
name="query_database",
func=query_database_via_mcp,
description="查询数据库"
)
]
# 创建 Agent
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个智能助手,可以使用以下工具:"),
("user", "{input}")
])
llm = ChatOpenAI(model="gpt-4")
agent = create_openai_tools_agent(llm, mcp_tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=mcp_tools)
# 使用 Agent
response = agent_executor.invoke({
"input": "读取项目中的 README.md 文件并总结主要内容"
})
3. 集成到 CrewAI
# crewai_integration.py
from crewai import Agent, Task, Crew
from langchain_openai import ChatOpenAI
# 定义 MCP 工具
class MCPTool:
def __init__(self, name, func, description):
self.name = name
self.func = func
self.description = description
def run(self, inputs):
return self.func(inputs)
# 创建 Agents
research_agent = Agent(
role="研究员",
goal="研究项目信息",
backstory="你擅长分析和总结信息",
tools=[
MCPTool("read_file", read_file, "读取文件"),
MCPTool("query_db", query_database, "查询数据库")
],
verbose=True
)
writer_agent = Agent(
role="作家",
goal="生成高质量文档",
backstory="你擅长编写清晰的技术文档",
verbose=True
)
# 创建 Tasks
research_task = Task(
description="研究项目 README 文件",
agent=research_agent
)
write_task = Task(
description="基于研究结果生成项目文档",
agent=writer_agent
)
# 创建 Crew 并执行
crew = Crew(
agents=[research_agent, writer_agent],
tasks=[research_task, write_task]
)
result = crew.kickoff()
print(result)
高级特性
1. 资源管理
// resource-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListResourcesRequestSchema,
ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
const server = new Server(
{
name: "resource-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
resources: {},
},
}
);
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: "resource://config/app-config.json",
name: "应用配置",
description: "应用程序的配置文件",
mimeType: "application/json",
},
{
uri: "resource://templates/email-template.html",
name: "邮件模板",
description: "HTML 邮件模板",
mimeType: "text/html",
},
],
};
});
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
if (uri === "resource://config/app-config.json") {
const config = {
app: {
name: "My App",
version: "1.0.0",
},
database: {
host: "localhost",
port: 5432,
},
};
return {
contents: [
{
uri,
text: JSON.stringify(config, null, 2),
mimeType: "application/json",
},
],
};
}
throw new Error(`未知资源: ${uri}`);
});
2. 提示词模板
// prompt-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListPromptsRequestSchema,
GetPromptRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
const server = new Server(
{
name: "prompt-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
prompts: {},
},
}
);
server.setRequestHandler(ListPromptsRequestSchema, async () => {
return {
prompts: [
{
name: "code-review",
description: "代码审查提示词",
arguments: [
{
name: "code",
description: "要审查的代码",
required: true,
},
],
},
{
name: "summarize",
description: "文本总结提示词",
arguments: [
{
name: "text",
description: "要总结的文本",
required: true,
},
],
},
],
};
});
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case "code-review":
return {
description: "代码审查提示词",
messages: [
{
role: "user",
content: `请审查以下代码,检查以下方面:
1. 代码质量和可读性
2. 潜在的 bug
3. 性能优化建议
4. 安全性问题
代码:
\`\`\`${args.code}\`\`\`
请提供详细的审查报告。`,
},
],
};
case "summarize":
return {
description: "文本总结提示词",
messages: [
{
role: "user",
content: `请总结以下文本,提取关键信息:
${args.text}
总结要求:
1. 简洁明了
2. 突出重点
3. 保持原意`,
},
],
};
default:
throw new Error(`未知提示词: ${name}`);
}
});
测试和调试
1. 单元测试
// server.test.ts
import { test, expect } from "vitest";
import { MCPTester } from "@modelcontextprotocol/sdk/test";
import { Server } from "./server";
test("hello tool", async () => {
const server = new Server(/* ... */);
const tester = new MCPTester(server);
const response = await tester.callTool("hello", {
name: "世界",
});
expect(response.content[0].text).toContain("你好,世界");
});
2. 集成测试
# 测试 MCP 服务器
mcp test server.ts
# 测试与 Claude Code 的集成
claude mcp test github-server.ts
3. 调试技巧
# 启用调试模式
DEBUG=mcp:* node server.js
# 查看 MCP 连接状态
claude mcp status
# 测试单个工具
claude mcp test --tool hello --args '{"name": "测试"}'
部署和发布
1. 打包 MCP 服务器
// package.json
{
"name": "my-mcp-server",
"version": "1.0.0",
"bin": {
"my-mcp-server": "dist/index.js"
},
"scripts": {
"build": "tsc",
"start": "node dist/index.js",
"test": "vitest"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.0.0"
},
"devDependencies": {
"typescript": "^5.0.0",
"vitest": "^1.0.0"
}
}
2. 发布到 NPM
# 构建项目
npm run build
# 发布到 NPM
npm publish
# 安装使用
npm install -g my-mcp-server
claude mcp add my-server my-mcp-server
3. Docker 部署
# Dockerfile
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY . .
RUN npm run build
EXPOSE 3000
CMD ["node", "dist/index.js"]
性能优化
1. 缓存策略
// cached-server.ts
import { LRUCache } from "lru-cache";
const cache = new LRUCache({
max: 100,
ttl: 1000 * 60 * 5, // 5 分钟
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const cacheKey = `${request.params.name}-${JSON.stringify(request.params.arguments)}`;
if (cache.has(cacheKey)) {
return cache.get(cacheKey);
}
const result = await /* 执行操作 */;
cache.set(cacheKey, result);
return result;
});
2. 批量处理
// batch-server.ts
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "batch_process") {
const results = await Promise.all(
args.items.map(async (item) => {
return await processItem(item);
})
);
return {
content: [
{
type: "text",
text: JSON.stringify(results, null, 2),
},
],
};
}
});
总结
MCP 开发为 Agent 生态提供了强大的工具集成能力。通过学习本指南,你已经掌握了:
- 基础 MCP 服务器开发
- 实用服务器实现(文件系统、数据库、GitHub)
- 集成到主流 Agent 框架
- 高级功能(资源管理、提示词模板)
- 测试、调试和部署
下一步建议:
- 尝试实现一个特定领域的 MCP 服务器
- 将现有服务集成为 MCP 服务器
- 参与 MCP 社区贡献
记住,MCP 的核心价值在于标准化工具接入,让 Agent 能够轻松扩展能力边界。