返回资料库 打底:概念与协议

MCP 开发实战指南(2026)

Model Context Protocol (MCP) 是 2026 年 Agent 生态中最关键的标准之一,它为 LLM 和外部工具之间提供了统一的接口。本指南将带你从零开始构建 MCP 服务器,并集成到各种 Agent 框架中。

来源:MCP 官方文档 · GitHub 组织 | 整理时间:2026-04-19


概述

Model Context Protocol (MCP) 是 2026 年 Agent 生态中最关键的标准之一,它为 LLM 和外部工具之间提供了统一的接口。本指南将带你从零开始构建 MCP 服务器,并集成到各种 Agent 框架中。

MCP 核心概念

组件 说明 实现方式
Tools Agent 可以调用的函数 定义输入输出和执行逻辑
Resources Agent 可以访问的数据源 提供数据读取接口
Prompts 预定义的提示词模板 管理复杂的提示词结构
Servers 实现 MCP 协议的服务 处理连接和消息路由

快速开始

1. 安装 MCP 开发环境

# 安装 MCP CLI
npm install -g @modelcontextprotocol/cli

# 安装 TypeScript SDK
npm install @modelcontextprotocol/sdk

# 初始化项目
npm init @modelcontextprotocol/server

2. 创建第一个 MCP 服务器

// server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

const server = new Server(
  {
    name: "hello-world-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// 定义工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "hello",
        description: "返回问候语",
        inputSchema: {
          type: "object",
          properties: {
            name: {
              type: "string",
              description: "姓名",
            },
          },
          required: ["name"],
        },
      },
    ],
  };
});

// 实现工具
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "hello") {
    const greeting = `你好,${args.name}!欢迎使用 MCP 开发。`;
    return {
      content: [
        {
          type: "text",
          text: greeting,
        },
      ],
    };
  }

  throw new Error(`未知工具: ${name}`);
});

// 启动服务器
const transport = new StdioServerTransport();
server.connect(transport);
console.log("MCP 服务器启动成功");

3. 运行服务器

# 运行服务器
npx tsx server.ts

# 或者编译后运行
npm run build
node dist/server.js

实用 MCP 服务器开发

1. 文件系统服务器

// filesystem-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { promises as fs } from "fs";
import path from "path";

const server = new Server(
  {
    name: "filesystem-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// 文件操作工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "read_file",
        description: "读取文件内容",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "文件路径",
            },
          },
          required: ["path"],
        },
      },
      {
        name: "write_file",
        description: "写入文件",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "文件路径",
            },
            content: {
              type: "string",
              description: "文件内容",
            },
          },
          required: ["path", "content"],
        },
      },
      {
        name: "list_directory",
        description: "列出目录内容",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "目录路径",
            },
          },
          required: ["path"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  try {
    switch (name) {
      case "read_file": {
        const content = await fs.readFile(args.path, "utf-8");
        return {
          content: [
            {
              type: "text",
              text: content,
            },
          ],
        };
      }

      case "write_file": {
        await fs.writeFile(args.path, args.content, "utf-8");
        return {
          content: [
            {
              type: "text",
              text: `文件已成功写入: ${args.path}`,
            },
          ],
        };
      }

      case "list_directory": {
        const items = await fs.readdir(args.path);
        const details = await Promise.all(
          items.map(async (item) => {
            const fullPath = path.join(args.path, item);
            const stats = await fs.stat(fullPath);
            return {
              name: item,
              path: fullPath,
              isDirectory: stats.isDirectory(),
              size: stats.size,
              modified: stats.mtime.toISOString(),
            };
          })
        );
        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(details, null, 2),
            },
          ],
        };
      }

      default:
        throw new Error(`未知工具: ${name}`);
    }
  } catch (error) {
    throw new Error(`操作失败: ${error.message}`);
  }
});

const transport = new StdioServerTransport();
server.connect(transport);

2. 数据库 MCP 服务器

// database-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { Database } from "sqlite3";
import { open } from "sqlite";
import { sqlite3 } from "sqlite3";

const server = new Server(
  {
    name: "database-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// 初始化数据库连接
let db: Database;

async function initDatabase() {
  db = await open({
    filename: "./database.db",
    driver: sqlite3.Database,
  });
  
  // 创建示例表
  await db.exec(`
    CREATE TABLE IF NOT EXISTS users (
      id INTEGER PRIMARY KEY AUTOINCREMENT,
      name TEXT NOT NULL,
      email TEXT UNIQUE NOT NULL,
      created_at DATETIME DEFAULT CURRENT_TIMESTAMP
    )
  `);
}

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "query",
        description: "执行 SQL 查询",
        inputSchema: {
          type: "object",
          properties: {
            sql: {
              type: "string",
              description: "SQL 查询语句",
            },
            params: {
              type: "array",
              items: {
                type: "string",
              },
              description: "查询参数",
            },
          },
          required: ["sql"],
        },
      },
      {
        name: "insert",
        description: "插入数据",
        inputSchema: {
          type: "object",
          properties: {
            table: {
              type: "string",
              description: "表名",
            },
            data: {
              type: "object",
              description: "要插入的数据",
              additionalProperties: {
                type: "string",
              },
            },
          },
          required: ["table", "data"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (!db) {
    await initDatabase();
  }

  try {
    switch (name) {
      case "query": {
        const stmt = await db.prepare(args.sql);
        const result = args.params 
          ? await stmt.all(...args.params)
          : await stmt.all();
        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(result, null, 2),
            },
          ],
        };
      }

      case "insert": {
        const columns = Object.keys(args.data);
        const placeholders = columns.map(() => "?").join(", ");
        const values = Object.values(args.data);
        
        const sql = `INSERT INTO ${args.table} (${columns.join(", ")}) VALUES (${placeholders})`;
        await db.run(sql, values);
        
        return {
          content: [
            {
              type: "text",
              text: `数据已插入到 ${args.table}`,
            },
          ],
        };
      }

      default:
        throw new Error(`未知工具: ${name}`);
    }
  } catch (error) {
    throw new Error(`数据库操作失败: ${error.message}`);
  }
});

const transport = new StdioServerTransport();
server.connect(transport);

3. GitHub 集成服务器

// github-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { Octokit } from "@octokit/rest";

const server = new Server(
  {
    name: "github-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

const octokit = new Octokit({
  auth: process.env.GITHUB_TOKEN,
});

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "get_issue",
        description: "获取 Issue 详情",
        inputSchema: {
          type: "object",
          properties: {
            owner: {
              type: "string",
              description: "仓库所有者",
            },
            repo: {
              type: "string",
              description: "仓库名称",
            },
            issue_number: {
              type: "number",
              description: "Issue 编号",
            },
          },
          required: ["owner", "repo", "issue_number"],
        },
      },
      {
        name: "create_issue",
        description: "创建新 Issue",
        inputSchema: {
          type: "object",
          properties: {
            owner: {
              type: "string",
              description: "仓库所有者",
            },
            repo: {
              type: "string",
              description: "仓库名称",
            },
            title: {
              type: "string",
              description: "Issue 标题",
            },
            body: {
              type: "string",
              description: "Issue 内容",
            },
          },
          required: ["owner", "repo", "title"],
        },
      },
      {
        name: "list_pull_requests",
        description: "列出 Pull Requests",
        inputSchema: {
          type: "object",
          properties: {
            owner: {
              type: "string",
              description: "仓库所有者",
            },
            repo: {
              type: "string",
              description: "仓库名称",
            },
            state: {
              type: "string",
              enum: ["open", "closed", "all"],
              default: "open",
              description: "PR 状态",
            },
          },
          required: ["owner", "repo"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  try {
    switch (name) {
      case "get_issue": {
        const issue = await octokit.rest.issues.get({
          owner: args.owner,
          repo: args.repo,
          issue_number: args.issue_number,
        });
        
        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(issue.data, null, 2),
            },
          ],
        };
      }

      case "create_issue": {
        const newIssue = await octokit.rest.issues.create({
          owner: args.owner,
          repo: args.repo,
          title: args.title,
          body: args.body || "",
        });
        
        return {
          content: [
            {
              type: "text",
              text: `Issue 已创建: #${newIssue.data.number}`,
            },
          ],
        };
      }

      case "list_pull_requests": {
        const prs = await octokit.rest.pulls.list({
          owner: args.owner,
          repo: args.repo,
          state: args.state,
        });
        
        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(prs.data, null, 2),
            },
          ],
        };
      }

      default:
        throw new Error(`未知工具: ${name}`);
    }
  } catch (error) {
    throw new Error(`GitHub API 调用失败: ${error.message}`);
  }
});

const transport = new StdioServerTransport();
server.connect(transport);

集成到 Agent 框架

1. 集成到 Claude Code

# 添加 GitHub MCP 服务器
claude mcp add github npx @modelcontextprotocol/server-github

# 添加文件系统 MCP 服务器
claude mcp add filesystem npx @modelcontextprotocol/server-filesystem

# 添加数据库 MCP 服务器
claude mcp add database node your-database-server.js

2. 集成到 LangChain

# langchain_integration.py
from langchain.agents import Tool, AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 定义 MCP 工具
mcp_tools = [
    Tool(
        name="read_file",
        func=read_file_from_mcp,
        description="读取文件内容"
    ),
    Tool(
        name="write_file", 
        func=write_file_to_mcp,
        description="写入文件"
    ),
    Tool(
        name="query_database",
        func=query_database_via_mcp,
        description="查询数据库"
    )
]

# 创建 Agent
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个智能助手,可以使用以下工具:"),
    ("user", "{input}")
])

llm = ChatOpenAI(model="gpt-4")
agent = create_openai_tools_agent(llm, mcp_tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=mcp_tools)

# 使用 Agent
response = agent_executor.invoke({
    "input": "读取项目中的 README.md 文件并总结主要内容"
})

3. 集成到 CrewAI

# crewai_integration.py
from crewai import Agent, Task, Crew
from langchain_openai import ChatOpenAI

# 定义 MCP 工具
class MCPTool:
    def __init__(self, name, func, description):
        self.name = name
        self.func = func
        self.description = description
    
    def run(self, inputs):
        return self.func(inputs)

# 创建 Agents
research_agent = Agent(
    role="研究员",
    goal="研究项目信息",
    backstory="你擅长分析和总结信息",
    tools=[
        MCPTool("read_file", read_file, "读取文件"),
        MCPTool("query_db", query_database, "查询数据库")
    ],
    verbose=True
)

writer_agent = Agent(
    role="作家",
    goal="生成高质量文档",
    backstory="你擅长编写清晰的技术文档",
    verbose=True
)

# 创建 Tasks
research_task = Task(
    description="研究项目 README 文件",
    agent=research_agent
)

write_task = Task(
    description="基于研究结果生成项目文档",
    agent=writer_agent
)

# 创建 Crew 并执行
crew = Crew(
    agents=[research_agent, writer_agent],
    tasks=[research_task, write_task]
)

result = crew.kickoff()
print(result)

高级特性

1. 资源管理

// resource-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListResourcesRequestSchema,
  ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

const server = new Server(
  {
    name: "resource-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
      resources: {},
    },
  }
);

server.setRequestHandler(ListResourcesRequestSchema, async () => {
  return {
    resources: [
      {
        uri: "resource://config/app-config.json",
        name: "应用配置",
        description: "应用程序的配置文件",
        mimeType: "application/json",
      },
      {
        uri: "resource://templates/email-template.html",
        name: "邮件模板",
        description: "HTML 邮件模板",
        mimeType: "text/html",
      },
    ],
  };
});

server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const { uri } = request.params;
  
  if (uri === "resource://config/app-config.json") {
    const config = {
      app: {
        name: "My App",
        version: "1.0.0",
      },
      database: {
        host: "localhost",
        port: 5432,
      },
    };
    
    return {
      contents: [
        {
          uri,
          text: JSON.stringify(config, null, 2),
          mimeType: "application/json",
        },
      ],
    };
  }
  
  throw new Error(`未知资源: ${uri}`);
});

2. 提示词模板

// prompt-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListPromptsRequestSchema,
  GetPromptRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

const server = new Server(
  {
    name: "prompt-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
      prompts: {},
    },
  }
);

server.setRequestHandler(ListPromptsRequestSchema, async () => {
  return {
    prompts: [
      {
        name: "code-review",
        description: "代码审查提示词",
        arguments: [
          {
            name: "code",
            description: "要审查的代码",
            required: true,
          },
        ],
      },
      {
        name: "summarize",
        description: "文本总结提示词",
        arguments: [
          {
            name: "text",
            description: "要总结的文本",
            required: true,
          },
        ],
      },
    ],
  };
});

server.setRequestHandler(GetPromptRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;
  
  switch (name) {
    case "code-review":
      return {
        description: "代码审查提示词",
        messages: [
          {
            role: "user",
            content: `请审查以下代码,检查以下方面:
1. 代码质量和可读性
2. 潜在的 bug
3. 性能优化建议
4. 安全性问题

代码:
\`\`\`${args.code}\`\`\`

请提供详细的审查报告。`,
          },
        ],
      };
      
    case "summarize":
      return {
        description: "文本总结提示词",
        messages: [
          {
            role: "user",
            content: `请总结以下文本,提取关键信息:

${args.text}

总结要求:
1. 简洁明了
2. 突出重点
3. 保持原意`,
          },
        ],
      };
      
    default:
      throw new Error(`未知提示词: ${name}`);
  }
});

测试和调试

1. 单元测试

// server.test.ts
import { test, expect } from "vitest";
import { MCPTester } from "@modelcontextprotocol/sdk/test";
import { Server } from "./server";

test("hello tool", async () => {
  const server = new Server(/* ... */);
  const tester = new MCPTester(server);
  
  const response = await tester.callTool("hello", {
    name: "世界",
  });
  
  expect(response.content[0].text).toContain("你好,世界");
});

2. 集成测试

# 测试 MCP 服务器
mcp test server.ts

# 测试与 Claude Code 的集成
claude mcp test github-server.ts

3. 调试技巧

# 启用调试模式
DEBUG=mcp:* node server.js

# 查看 MCP 连接状态
claude mcp status

# 测试单个工具
claude mcp test --tool hello --args '{"name": "测试"}'

部署和发布

1. 打包 MCP 服务器

// package.json
{
  "name": "my-mcp-server",
  "version": "1.0.0",
  "bin": {
    "my-mcp-server": "dist/index.js"
  },
  "scripts": {
    "build": "tsc",
    "start": "node dist/index.js",
    "test": "vitest"
  },
  "dependencies": {
    "@modelcontextprotocol/sdk": "^1.0.0"
  },
  "devDependencies": {
    "typescript": "^5.0.0",
    "vitest": "^1.0.0"
  }
}

2. 发布到 NPM

# 构建项目
npm run build

# 发布到 NPM
npm publish

# 安装使用
npm install -g my-mcp-server
claude mcp add my-server my-mcp-server

3. Docker 部署

# Dockerfile
FROM node:18-alpine

WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production

COPY . .
RUN npm run build

EXPOSE 3000
CMD ["node", "dist/index.js"]

性能优化

1. 缓存策略

// cached-server.ts
import { LRUCache } from "lru-cache";

const cache = new LRUCache({
  max: 100,
  ttl: 1000 * 60 * 5, // 5 分钟
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const cacheKey = `${request.params.name}-${JSON.stringify(request.params.arguments)}`;
  
  if (cache.has(cacheKey)) {
    return cache.get(cacheKey);
  }
  
  const result = await /* 执行操作 */;
  cache.set(cacheKey, result);
  return result;
});

2. 批量处理

// batch-server.ts
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;
  
  if (name === "batch_process") {
    const results = await Promise.all(
      args.items.map(async (item) => {
        return await processItem(item);
      })
    );
    
    return {
      content: [
        {
          type: "text",
          text: JSON.stringify(results, null, 2),
        },
      ],
    };
  }
});

总结

MCP 开发为 Agent 生态提供了强大的工具集成能力。通过学习本指南,你已经掌握了:

  1. 基础 MCP 服务器开发
  2. 实用服务器实现(文件系统、数据库、GitHub)
  3. 集成到主流 Agent 框架
  4. 高级功能(资源管理、提示词模板)
  5. 测试、调试和部署

下一步建议:

  • 尝试实现一个特定领域的 MCP 服务器
  • 将现有服务集成为 MCP 服务器
  • 参与 MCP 社区贡献

记住,MCP 的核心价值在于标准化工具接入,让 Agent 能够轻松扩展能力边界。

相关链接

常见问题

MCP 开发实战指南(2026) 适合什么读者?

想搞清楚 打底 这一块的人,尤其是从概念跨到能动手做的过渡阶段。页面带摘要、同主题延伸和来源链接,你不用再去满网找。

阅读 MCP 开发实战指南(2026) 需要多久?

预计 32 分钟。看不下去就跳到结论部分,把感兴趣的小节做个标记,后面再回来。

怎么把 MCP 开发实战指南(2026) 用在自己的项目里?

挑一个文章里的最小例子先跑通,跑通之后再拿到自己的项目里。卡住的地方对照来源链接核验细节,部署和评估的部分可以从同主题其他文章补。