团团虾导读:RDMA 数据传输之前必须经过握手建立连接。这篇完整追踪了 Mooncake 的 TCP+JSON 握手协议,逐字段拆解 HandShakeDesc 结构体,然后深入 QP 状态机的每一步转换——从硬件获取 LID/GID 到 ibv_modify_qp 的四个阶段,以及每阶段的失败处理和 MTU 协商机制。排查 RDMA 连接不通问题时必读。
阅读版本: Mooncake v0.3.10.post2-104-geaf724ab (commit eaf724ab, 2026-05-20)
Mooncake Transfer Engine 握手协议与 RDMA QP 状态机深度解析
目录
- 握手机制全景图
- HandShakeDesc:握手数据包的字段定义
- LID/GID/QP 号的来源与分发机制
- QP 状态机:RESET to INIT to RTR to RTS
- RTR 与 RTS 之间:有无数据探测?各阶段失败怎么办
- INIT to RTR 失败时如何知道 MTU 不匹配 / GID 路由无效
- 握手的 RPC 通信框架:裸 TCP Socket + JSON
- 各后端握手差异对比
- 代码验证
1. 握手机制全景图
Mooncake Transfer Engine 的握手机制是一套运行在 RDMA 控制面的协议,负责在本地 RNIC(RDMA 网卡)和对端 RNIC 之间建立可靠的 Queue Pair(QP)连接。整体流程图如下:
主动端 (Active) 被动端 (Passive)
┌──────────────────┐ ┌──────────────────┐
│ RdmaEndPoint │ │ RdmaEndPoint │
│ │ │ │
│ 1.获取本地LID/GID │ │ 1.启动RPC监听线程 │
│ 2.生成本地QP号 │ │ 2.等待握手RPC请求 │
│ 3.构造HandShake │ TCP+JSON │ │
│ Desc │ ←──────────────────→ │ 3.收到对端QP信息 │
│ │ sendHandshake() │ 4.构造本地QP信息 │
│ 4.收到对端信息后 │ │ 5.返回HandShake │
│ 执行doSetupConn │ │ Desc │
│ (RESET→INIT │ │ 6.执行doSetupConn │
│ →RTR→RTS) │ │ (同样四步转换) │
│ │ │ │
│ 5.RDMA数据传输 │ ←─── RDMA Read/Write ─→│ 5.RDMA数据传输 │
└──────────────────┘ └──────────────────┘
核心类关系图:
TransferMetadata (握手协议编排层)
├── HandShakeDesc (握手数据包)
├── HandShakePlugin (插件接口)
│ └── SocketHandShakePlugin (TCP+JSON 实现)
└── HandShakeDaemon (监听线程)
RdmaTransport (RDMA传输层)
├── RdmaContext (每个RNIC一个)
│ ├── LID / GID (通过 ibv_query_* 获取)
│ ├── EndpointStore (QP端点缓存)
│ └── CQ / Memory Region
└── RdmaEndPoint (每个对端一个)
├── QP_list_ (多个QP,默认2个)
├── doSetupConnection() (状态机转换)
└── submitPostSend() (数据面提交)
2. HandShakeDesc:握手数据包的字段定义
HandShakeDesc 结构体定义在 include/transfer_metadata.h:119:
struct HandShakeDesc {
std::string local_nic_path; // 本地 NIC 路径,格式: "server_name@nic_name"
uint16_t local_lid = 0; // 本地 LID (Local Identifier)
std::string local_gid; // 本地 GID (Global Identifier), 16字节hex编码
std::string peer_nic_path; // 对端 NIC 路径
#ifdef USE_UB
std::vector<uint32_t> jetty_num; // 鲲鹏UB/URMA传输jetty号
#endif
#ifdef USE_BAREX
uint16_t barex_port; // Barex 传输端口
#endif
std::vector<uint32_t> qp_num; // 本地 QP 号码列表
std::string reply_msg; // 错误回复消息(握手失败时填充)
#ifdef USE_EFA
std::string efa_addr; // EFA endpoint 地址 (hex编码)
#endif
};
字段详解
local_nic_path 和 peer_nic_path:NIC 路径格式为 "server_name@nic_name"(如 "192.168.3.76@mlx5_3")。server_name 通常为 IP 地址或主机名,nic_name 为 RDMA 设备名。在 RdmaContext::nicPath() 中由 MakeNicPath(engine_.local_server_name_, device_name_) 生成。
local_lid:16位 Local Identifier,由硬件分配。在 RdmaContext::openRdmaDevice() 中通过 ibv_query_port() 获取 port_attr.lid,存入 RdmaContext::lid_。
local_gid:128位 Global Identifier 的 hex 字符串表示。在 RdmaContext::gid() 中由 gid_ 字段的 16 字节 raw 数据通过 sprintf(buf, "%02x", ...) 转换而来,字节间以 : 分隔(如 "fe:80:00:00:00:00:...")。
qp_num:向量类型,每个 QP 有一个 32 位号码。由 RdmaEndPoint::qpNum() 从 qp_list_[i]->qp_num 读取。QP 由 ibv_create_qp() 创建时硬件分配。
注意字段布局:在实际代码中,#ifdef USE_UB 和 #ifdef USE_BAREX 条件编译块位于 peer_nic_path 和 qp_num 之间,#ifdef USE_EFA 块位于 reply_msg 之后。JSON 序列化使用命名键,因此字段顺序不影响序列化结果,但对于理解源码中 ABI/缓存行布局有意义。
barex_port:仅在 USE_BAREX 编译宏开启时有效,用于 Barex 自定义传输协议。Barex 不走 Verbs API,而是通过 XFabric 库进行 RDMA 通信。
efa_addr:仅在 USE_EFA 编译宏开启时有效。EFA(Elastic Fabric Adapter,AWS)使用 libfabric 而非 Verbs API,端点的唯一标识是一个二进制地址,通过 fi_getname() 获取后 hex 编码。
3. LID/GID/QP 号的来源与分发机制
3.1 LID(Local Identifier)
**谁决定的?**硬件(交换机/子网管理器)。
谁获取的?RdmaContext 在初始化时通过 ibv_query_port() 获取。
// rdma_context.cpp:623
ibv_port_attr attr;
int ret = ibv_query_port(context, port, &attr);
// ...
lid_ = attr.lid; // 805行
LID 是 InfiniBand 子网内的本地地址,由子网管理器(Subnet Manager)在子网初始化和拓扑发现过程中分配。对于 RoCE(RDMA over Converged Ethernet),LID 通常为 0(因为 RoCE 不使用 LID 路由),实际路由依赖 GID。
3.2 GID(Global Identifier)
**谁决定的?**结合硬件能力和配置。IPv4 GID 基于网卡 IP 通过 IPv4-mapped IPv6 格式生成。
谁获取的?RdmaContext 通过智能选择算法 findBestGidIndex() 自动选择最佳 GID。
选择优先级(rdma_context.cpp:530):
- IPv4 GID + 有网络设备(最优)
- IPv4 GID + 无网络设备(降级但可用)
- IPv6 GID + 有网络设备
- IPv6 GID + 无网络设备
// rdma_context.cpp:542-594
for (i = 0; i < port_attr.gid_tbl_len; i++) {
if (ibv_query_gid_ex(context, port, i, &gid_entry, 0)) break;
if (gid_entry.gid_type != IBV_GID_TYPE_ROCE_V2 &&
gid_entry.gid_type != IBV_GID_TYPE_IB) continue;
// IPv4优先...
}
用户也可以通过 MC_GID_INDEX 环境变量覆盖自动选择(对应 config.h:38 的 gid_index = -1 默认值,-1 表示自动选择)。
3.3 QP Number
**谁决定的?**硬件(HCA/RDMA 网卡)在 ibv_create_qp() 时分配。
谁获取的?RdmaEndPoint 创建 QP 后从 qp->qp_num 读取。
// rdma_endpoint.cpp:74-82 (construct阶段)
for (size_t i = 0; i < num_qp_list; ++i) {
// ...
attr.qp_type = IBV_QPT_RC; // Reliable Connection
qp_list_[i] = ibv_create_qp(context_.pd(), &attr);
// QP号自动分配,后续可通过 qp_list_[i]->qp_num 读取
}
// rdma_endpoint.cpp:665-669 (读取)
std::vector<uint32_t> RdmaEndPoint::qpNum() const {
std::vector<uint32_t> ret;
for (int qp_index = 0; qp_index < (int)qp_list_.size(); ++qp_index)
ret.push_back(qp_list_[qp_index]->qp_num);
return ret;
}
分发流程:主动端通过 sendHandshake() RPC 将自己的 qp_num 发给对端;对端收到后,用这些 QP 号填充 ibv_qp_attr.dest_qp_num 完成状态机转换。
3.4 完整的分发时序
时间线: 主动端 (Initiator) 被动端 (Target)
│
│ [RdmaContext 初始化]
│ - ibv_open_device()
│ - ibv_query_port() → LID
│ - findBestGidIndex() → gid_index
│ - ibv_query_gid() → GID
│
│ [RdmaEndPoint.construct()]
│ - ibv_create_qp() × N
│ - qp_list_[i]->qp_num ← 硬件分配
│
│ [setupConnectionsByActive()]
│ - local_desc.local_lid = context_.lid()
│ - local_desc.local_gid = context_.gid()
│ - local_desc.qp_num = qpNum()
│ ─── TCP+JSON → sendHandshake(peer, local_desc) ────→
│ │
│ │ [onSetupRdmaConnections RPC回调]
│ │ - 解析 peer_desc (收到对端QP)
│ │ - 构造 local_desc (本地QP)
│ │ ├─ local_lid = context_.lid()
│ │ ├─ local_gid = context_.gid()
│ │ └─ qp_num = qpNum()
│ │
│ ←─── TCP+JSON ← local_desc ←───────────────────── │
│ │
│ [doSetupConnection()] │ [doSetupConnection()]
│ RESET→INIT→RTR→RTS │ RESET→INIT→RTR→RTS
│ │
│ [CONNECTED] │ [CONNECTED]
│ ═══════ RDMA Read/Write 数据传输 ═══════════════════│
4. QP 状态机:RESET to INIT to RTR to RTS
这是 RDMA RC(Reliable Connection)QP 的四步经典状态转换,Mooncake 在 RdmaEndPoint::doSetupConnection() 方法中实现(rdma_endpoint.cpp:744)。
4.1 状态机总览
[任意状态]
│ ibv_modify_qp(IBV_QPS_RESET) // 步骤0: 回到起点
▼
┌─────────┐
│ RESET │
└────┬────┘
│ ibv_modify_qp(IBV_QPS_INIT) // 步骤1: 指定本机端口和能力
│ + port_num, pkey_index, access_flags
▼
┌─────────┐
│ INIT │
└────┬────┘
│ ibv_modify_qp(IBV_QPS_RTR) // 步骤2: 指定对端地址和MTU
│ + MTU, GID, LID, dest_qp_num, RQ_PSN
▼
┌─────────┐
│ RTR │ (Ready to Receive — 可以接收对端数据)
└────┬────┘
│ ibv_modify_qp(IBV_QPS_RTS) // 步骤3: 指定本地发送参数
│ + timeout, retry_cnt, SQ_PSN, max_rd_atomic
▼
┌─────────┐
│ RTS │ (Ready to Send — 双向数据面可用)
└─────────┘
4.2 步骤0:Any State to RESET
// rdma_endpoint.cpp:751-761
ibv_qp_attr attr;
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RESET;
int ret = ibv_modify_qp(qp, &attr, IBV_QP_STATE);
这一步将 QP 拉回到 RESET 状态。对于新创建的 QP(默认就在 RESET),Mooncake 仍显式执行此步骤作为安全保护。但需要注意:根据 InfiniBand 规范,从 RESET 到 RESET 的状态转换是无效的(invalid transition),某些驱动实现在 QP 已经在 RESET 状态时调用 ibv_modify_qp(IBV_QPS_RESET) 可能会失败。如果失败,代码返回 ERR_ENDPOINT 并设置 reply_msg。
4.3 步骤1:RESET to INIT
// rdma_endpoint.cpp:763-779
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_INIT;
attr.port_num = context_.portNum(); // 物理端口号 (通常为1)
attr.pkey_index = globalConfig().pkey_index; // 分区键索引 (默认0)
attr.qp_access_flags =
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ |
IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC;
ret = ibv_modify_qp(
qp, &attr,
IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS);
在这一步,QP 从 “未配置” 变为 “已配置本地属性”:
- port_num:告诉 HCA 使用哪个物理端口
- pkey_index:分区键,用于 InfiniBand 分区隔离
- qp_access_flags:声明该 QP 支持的操作类型(本地写、远程读、远程写、远程原子操作)
注意:INIT 状态下的 QP 不能收发任何数据。
4.4 步骤2:INIT to RTR
// rdma_endpoint.cpp:782-816
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
// MTU 协商
attr.path_mtu = context_.activeMTU();
if (globalConfig().mtu_length < attr.path_mtu)
attr.path_mtu = globalConfig().mtu_length;
// GID 路由信息 (RoCE v2 必须)
attr.ah_attr.grh.dgid = peer_gid;
attr.ah_attr.grh.sgid_index = context_.gidIndex();
attr.ah_attr.grh.hop_limit = 16;
if (globalConfig().ib_traffic_class >= 0) {
attr.ah_attr.grh.traffic_class = (uint8_t)globalConfig().ib_traffic_class;
}
// LID 路由信息
attr.ah_attr.dlid = peer_lid;
attr.ah_attr.sl = 0; // Service Level
attr.ah_attr.src_path_bits = 0;
attr.ah_attr.static_rate = 0;
attr.ah_attr.is_global = 1; // 启用 GRH (RoCE v2 必需)
attr.ah_attr.port_num = context_.portNum();
// 对端 QP 信息
attr.dest_qp_num = peer_qp_num;
attr.rq_psn = 0; // Receive Queue Packet Sequence Number
attr.max_dest_rd_atomic = 16;
attr.min_rnr_timer = 12;
ret = ibv_modify_qp(qp, &attr,
IBV_QP_STATE | IBV_QP_PATH_MTU | IBV_QP_MIN_RNR_TIMER |
IBV_QP_AV | IBV_QP_MAX_DEST_RD_ATOMIC |
IBV_QP_DEST_QPN | IBV_QP_RQ_PSN);
这是最关键的一步,包含了连接对端所需的所有网络路由信息:
| 参数 | 作用 | 来源 |
|---|---|---|
path_mtu | 路径 MTU,取本地 active_mtu 和配置 mtu_length 的较小值 | context_.activeMTU() |
ah_attr.grh.dgid | 对端 GID(用于 RoCE v2 路由) | 对端握手传来的 local_gid |
ah_attr.grh.sgid_index | 本地 GID 索引 | context_.gidIndex() |
ah_attr.grh.hop_limit | IPv6 跳数限制(固定 16) | 常量 MAX_HOP_LIMIT |
ah_attr.dlid | 对端 LID | 对端握手传来的 local_lid |
ah_attr.is_global | 启用 GRH 头部(RoCE v2 = 1) | 固定为 1 |
dest_qp_num | 对端 QP 号码 | 对端握手传来的 qp_num |
rq_psn | 接收队列包序列号,初始 0 | 固定为 0 |
4.5 步骤3:RTR to RTS
// rdma_endpoint.cpp:819-835
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.timeout = 14; // 超时时间 (4.096us × 2^timeout)
attr.retry_cnt = 7; // 发送重试次数
attr.rnr_retry = 7; // RNR NAK 重试次数
attr.sq_psn = 0; // Send Queue Packet Sequence Number
attr.max_rd_atomic = 16; // 最大并发 RDMA atomic 操作数
ret = ibv_modify_qp(qp, &attr,
IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT |
IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC);
进入 RTS 后,QP 可以双向发送和接收数据。这些参数控制数据面的可靠性:
| 参数 | 含义 | 默认值 |
|---|---|---|
timeout | 响应等待超时(约 67 毫秒) | 14 (4.096us × 2^14 ≈ 67ms) |
retry_cnt | 重试次数(超时后) | 7 |
rnr_retry | RNR(接收方未就绪)重试次数 | 7 |
sq_psn | 发送队列包序列号 | 0 |
max_rd_atomic | 最大并发 RDMA atomic | 16 |
5. RTR 与 RTS 之间:有无数据探测?各阶段失败怎么办
5.1 RTR to RTS 之间没有数据探测
RTR to RTS 是纯本地硬件寄存器操作,不涉及任何网络 I/O。ibv_modify_qp(IBV_QPS_RTS) 只是一个本地 QP 属性修改调用,告诉 HCA:
- 现在允许主动发送数据(在此之前 QP 只能接收)
- 这是本地状态变化,不会向对端发送任何探测包
代码上非常清晰——就是一行 ibv_modify_qp 调用,没有 ibv_post_send 或任何数据传输。
5.2 各阶段失败的错误处理
// rdma_endpoint.cpp:754-761 - Any → RESET 失败
if (ret) {
std::string message = "Failed to modify QP to RESET";
PLOG(ERROR) << "[Handshake] " << message;
if (reply_msg) *reply_msg = message + ": " + strerror(errno);
return ERR_ENDPOINT;
}
// rdma_endpoint.cpp:773-779 - RESET → INIT 失败
if (ret) {
std::string message =
"Failed to modify QP to INIT, check local context port num";
PLOG(ERROR) << "[Handshake] " << message;
if (reply_msg) *reply_msg = message + ": " + strerror(errno);
return ERR_ENDPOINT;
}
// rdma_endpoint.cpp:811-816 - INIT → RTR 失败
if (ret) {
std::string message =
"Failed to modify QP to RTR, check mtu, gid, peer lid, peer qp num";
PLOG(ERROR) << "[Handshake] " << message;
if (reply_msg) *reply_msg = message + ": " + strerror(errno);
return ERR_ENDPOINT;
}
// rdma_endpoint.cpp:831-835 - RTR → RTS 失败
if (ret) {
std::string message = "Failed to modify QP to RTS";
PLOG(ERROR) << "[Handshake] " << message;
if (reply_msg) *reply_msg = message + ": " + strerror(errno);
return ERR_ENDPOINT;
}
失败处理的关键设计:
-
reply_msg 传回对端:被动端失败时,
reply_msg通过握手 RPC 传回给发起端,发起端在setupConnectionsByActive()中检查:if (!peer_desc.reply_msg.empty()) { LOG(ERROR) << "Rejected handshake request by peer " << local_desc.peer_nic_path; disconnectUnlocked(); return ERR_REJECT_HANDSHAKE; } -
resetConnection 而不是 disconnect:当连接建立失败时,主动端调用
resetConnection("failed connection setup (active)")。对于普通 RDMA 设备,这就是disconnectUnlocked()(将 QP 恢复到 RESET)。对于阿里云 eRDMA 设备(CONFIG_ERDMA),需要reconstruct()销毁并重新创建 QP,因为 eRDMA 设备不支持 QP 从 RTS 回到 RESET 后再到 RTS。 -
并发握手安全:
setupConnectionsByActive()实现了”同时打开”(simultaneous open)的并发策略。如果两边同时发起主动握手,锁机制保证不会出现重复连接:// rdma_endpoint.cpp:304 while (status_.load(std::memory_order_acquire) == CONNECTING) { // spin + 指数退避 sleep,等待另一个线程完成握手 // 最多等待 10 秒 }
6. INIT to RTR 失败时如何知道 MTU 不匹配 / GID 路由无效
6.1 通过 ibv_modify_qp 的 errno 判断
硬件/HCA 驱动在 ibv_modify_qp 失败时会设置 errno。Mooncake 将 errno 和 strerror(errno) 拼接进错误信息:
if (reply_msg) *reply_msg = message + ": " + strerror(errno);
重要的是:ibv_modify_qp 返回的具体 errno 值完全取决于驱动实现(例如 mlx5、eRDMA、EFA 等),不同驱动可能对同一种失败返回不同的 errno。以下列出常见驱动中可能出现的 errno 与失败的对应关系(仅供参考,非标准保证):
| 失败原因 | 驱动可能返回的 errno | 含义 |
|---|---|---|
| MTU 不匹配 | EINVAL | 请求的 path_mtu 不被支持(大于对端端口 MTU 或路径 MTU) |
| GID 路由无效 | EHOSTUNREACH / ENETUNREACH | 无法到达对端 GID(ARP 失败、路由表中不存在) |
| 对端 QP 号不存在 | EINVAL | dest_qp_num 指向的对端 QP 不存在或类型不匹配 |
| 对端 LID 无效 | EINVAL / ENODEV | LID 不在子网中(InfiniBand 场景) |
| 本地端口未激活 | EINVAL / EIO | port_num 对应的物理端口非 ACTIVE 状态 |
例如,在 mlx5 驱动下,ibv_modify_qp 从 INIT 到 RTR 的多种不同失败场景都可能返回 EINVAL,仅靠 errno 无法区分具体原因(MTU 不匹配还是 QP 号无效)。此时需要结合日志中 strerror(errno) 的文本描述和上下文环境进行综合判断。
6.2 MTU 协商机制
Mooncake 在 INIT to RTR 时的 MTU 选择逻辑:
attr.path_mtu = context_.activeMTU(); // 本地端口激活 MTU
if (globalConfig().mtu_length < attr.path_mtu) // 取与配置MTU的较小值
attr.path_mtu = globalConfig().mtu_length;
activeMTU()来自ibv_query_port()返回的port_attr.active_mtu,这是该物理端口的实际 MTU。globalConfig().mtu_length默认值为IBV_MTU_4096(config.h:47),可通过MC_MTU环境变量调整。- 最终以两者的较小值为准。
- 如果该 MTU 大于对端端口的
active_mtu,ibv_modify_qp会返回失败(通常errno = EINVAL)。
6.3 日志中的诊断信息
Mooncake 在日志中使用 [Handshake] 标签标记所有握手相关错误,通过 PLOG(ERROR) 输出完整的 errno 描述。管理员可以通过以下关键字搜索问题:
[Handshake] Failed to modify QP to RTR, check mtu, gid, peer lid, peer qp num: Invalid argument
[Handshake] Failed to modify QP to RTR, check mtu, gid, peer lid, peer qp num: No route to host
[Handshake] Failed to modify QP to INIT, check local context port num: Invalid argument
[Handshake] QP count mismatch in peer and local endpoints, check MC_MAX_EP_PER_CTX
7. 握手的 RPC 通信框架:裸 TCP Socket + JSON
Mooncake 没有使用 gRPC、Thrift 或任何 RPC 框架。控制面使用原生 TCP Socket + JSON 序列化,实现在 SocketHandShakePlugin 类(transfer_metadata_plugin.cpp:616)。
7.1 架构层次
TransferMetadata (编排层)
│
├── sendHandshake() active端 → 构造JSON → TCP connect → send → recv → 解析JSON
│ passive端 → TCP accept → recv → 回调 → 构造JSON → send
│
└── HandShakePlugin (抽象接口)
│
└── SocketHandShakePlugin (TCP+JSON 实现)
├── startDaemon() 启动 epoll 监听线程
├── send() TCP client 发送握手请求
├── sendNotify() TCP client 发送通知
├── sendProbe() TCP client 发送探针
└── exchangeMetadata() 元数据交换(metadata模式)
7.2 消息协议
每条消息的格式(新协议):
[8字节长度 (uint64_t, 网络字节序)][1字节类型 (uint8_t)][N字节 JSON 负载]
关键实现(common.h:365-381):
static inline int writeString(int fd, const HandShakeRequestType type,
const std::string &str) {
uint8_t byte = static_cast<uint8_t>(type);
uint64_t length = str.size() +
(type == HandShakeRequestType::OldProtocol ? 0 : sizeof(byte));
writeFully(fd, &length, sizeof(length)); // 先写8字节长度
if (type != HandShakeRequestType::OldProtocol)
writeFully(fd, &byte, sizeof(byte)); // 再写1字节类型(仅新协议)
writeFully(fd, str.data(), str.size()); // 最后写JSON负载
}
消息类型枚举(common.h:59-65):
enum class HandShakeRequestType {
Connection = 0, // 握手连接请求
Metadata = 1, // 元数据交换
Notify = 2, // 通知消息
Probe = 3, // 探针消息
OldProtocol = 0xff, // 旧协议占位符(无类型字节,仅用长度前缀)
};
读取端的协议检测逻辑(common.h:418-452):
- 先读取 8 字节
length(uint64_t),然后读取length字节的 payload。 - 如果 payload 首字节
<= 3(即Probe的最大值),则判定为新协议:首字节 = 类型,剩余字节 = JSON。 - 否则判定为
OldProtocol = 0xff:整个 payload = JSON(不带类型字节)。
static inline std::pair<HandShakeRequestType, std::string> readString(int fd) {
HandShakeRequestType type = HandShakeRequestType::Connection;
// ...
uint64_t length = 0;
readFully(fd, &length, sizeof(length)); // 先读8字节长度
std::vector<char> buffer(length);
readFully(fd, buffer.data(), length); // 再读length字节数据
if (buffer[0] <= static_cast<char>(HandShakeRequestType::Probe)) {
// 新协议:首字节是类型,之后是JSON
type = static_cast<HandShakeRequestType>(buffer[0]);
str.assign(buffer.data() + sizeof(char), length - sizeof(char));
} else {
// 旧协议:无类型字节,全部是JSON
type = HandShakeRequestType::OldProtocol;
str.assign(buffer.data(), length);
}
return {type, str};
}
协议设计要点:
- 长度字段在前(8字节 uint64_t),类型字段在后(1字节 uint8_t),这使得接收端可以先读取固定长度头来确定后续数据量。
- 旧协议兼容:
OldProtocol = 0xff是一种不带类型字节的降级格式,仅在解码时使用。新协议的writeString不会以OldProtocol作为参数调用——发送端总是使用Connection/Metadata/Notify/Probe,类型字节始终写入。 - 长度语义:
length字段的值 = JSON 大小 +(新协议时)1 字节类型。接收端用length控制readFully的读取量。
7.3 服务端(被动端)监听流程
// transfer_metadata_plugin.cpp:654
virtual int startDaemon(uint16_t listen_port, int sockfd) {
// 1. 创建 TCP socket (IPv4 或 IPv6)
listen_fd_ = socket(globalConfig().use_ipv6 ? AF_INET6 : AF_INET,
SOCK_STREAM, 0);
// 2. 设置 SO_RCVTIMEO (1秒) 和 SO_REUSEADDR
// 3. bind + listen (backlog = handshake_listen_backlog, 默认128)
// 4. 启动监听线程:
listener_ = std::thread([this]() {
while (listener_running_) {
int conn_fd = accept(listen_fd_, ...);
// 5. 每个连接设置 60 秒读超时
// 6. 读取消息类型 + JSON
auto [type, json_str] = readString(conn_fd);
// 7. 解析 JSON
parseJsonString(json_str, peer);
// 8. 根据消息类型调用回调
on_connection_callback_(peer, local); // 握手
// 9. 发送回复
writeString(conn_fd, type, Json::FastWriter{}.write(local));
// 10. shutdown(SHUT_WR) + 等待对端关闭
close(conn_fd);
}
});
}
7.4 客户端(主动端)发送流程
// transfer_metadata_plugin.cpp:902
virtual int send(std::string ip_or_host_name, uint16_t rpc_port,
const Json::Value &local, Json::Value &peer) {
// 1. DNS 解析 (getaddrinfo)
getaddrinfo(ip_or_host_name.c_str(), service, &hints, &result);
// 2. 尝试每个解析到的地址
for (rp = result; rp; rp = rp->ai_next) {
ret = doSend(rp, local, peer);
if (ret == 0) return 0; // 成功则返回
}
}
// doSend():
int doSend(struct addrinfo *addr, const Json::Value &local, Json::Value &peer) {
// 1. socket() + SO_REUSEADDR + 60秒超时
// 2. connect()
// 3. writeString(type=Connection, JSON)
// 4. readString() 读取回复
// 5. parseJsonString()
// 6. close()
}
7.5 默认端口和配置
// config.h
uint16_t handshake_port = 12001; // 握手默认端口 (MC_HANDSHAKE_PORT)
uint16_t rpc_min_port = 15000; // RPC 端口范围下限
uint16_t rpc_max_port = 17000; // RPC 端口范围上限
int handshake_listen_backlog = 128; // TCP listen backlog
7.6 为什么不使用 RPC 框架?
不使用 gRPC/Thrift 的原因:
- 依赖最小化:Mooncake 旨在作为一个可嵌入的传输库,不应强制引入 gRPC 等大型依赖。
- 握手消息简单:握手只需要交换 < 1KB 的 JSON,不需要流式传输、protobuf 编解码或高级 RPC 功能。
- 性能无影响:握手只在连接建立时发生一次(或重连时),不是热路径,TCP+JSON 的延迟(~1ms)完全可以接受。
- 兼容性:TCP socket + JSON 不依赖任何版本兼容性问题。
8. 各后端握手差异对比
Mooncake 支持多种传输后端,握手机制有所不同:
| 后端 | API | 状态机 | 握手关键字段 | 特点 |
|---|---|---|---|---|
| RDMA | libibverbs | RESET→INIT→RTR→RTS | LID, GID, QP_NUM | 经典 RC QP 模型,每个对端有一组 QP |
| EFA | libfabric | 无 QP 状态机 | efa_addr | 共享端点模型(FI_EP_RDM),fi_av_insert() 替代 ibv_modify_qp |
| Barex | XFabric 自定义库 | 自定义 | barex_port | 不走 Verbs API,使用 XFabric 实现 RDMA |
| UB/URMA | 鲲鹏自定义 API | 自定义 | jetty_num | 华为鲲鹏平台专用 |
EFA 握手的关键差异
EFA 使用 libfabric 的 SRD(Scalable Reliable Datagram)模型,握手完全不同。以下是简化后的核心流程(efa_endpoint.cpp:42-89):
// 简化流程(省略了 loopback 检测、NIC 路径解析、空地址检查和错误处理)
int EfaEndPoint::setupConnectionsByActive() {
// 1. 构造 HandShakeDesc (携带 efa_addr)
local_desc.efa_addr = context_.localEpAddr();
// 2. RPC 交换地址
context_.engine().sendHandshake(peer_server_name, local_desc, peer_desc);
// 3. 将对方地址插入地址向量 (AV)
context_.insertPeerAddr(peer_desc.efa_addr, peer_fi_addr_);
// 4. 完成!不需要 ibv_modify_qp
status_.store(CONNECTED, std::memory_order_release);
}
实际源码还包含:loopback 自环检测(直接使用二进制地址避免 hex 编解码往返)、NIC 路径解析(getServerNameFromNicPath/getNicNameFromNicPath)、对端 efa_addr 空值检查等逻辑。
EFA 的 fi_getname() 获取的是 libfabric 端点地址,不是 LID/GID。因此 HandShakeDesc 中 local_lid、local_gid、qp_num 字段对于 EFA 后端不被使用,改用 efa_addr。
9. 代码验证
以下是通过源码阅读验证的关键声明:
9.1 HandShakeDesc 结构体定义
文件: mooncake-transfer-engine/include/transfer_metadata.h:119-135
结构体包含 local_nic_path、local_lid、local_gid、peer_nic_path、qp_num、reply_msg,以及条件编译的 jetty_num(USE_UB)、barex_port(USE_BAREX)、efa_addr(USE_EFA)字段。验证通过。
9.2 QP 状态机四步转换
文件: mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp:744-837
doSetupConnection(int qp_index, ...) 方法按顺序执行:
- 任意状态 → RESET (754行)
- RESET → INIT (764行)
- INIT → RTR (782行)
- RTR → RTS (819行)
验证通过。
9.3 LID 和 GID 来源
文件: mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp
- LID:
lid_ = attr.lid;(805行),来自ibv_query_port()返回的port_attr.lid - GID:
ibv_query_gid(context, port, gid_index, &gid_);(787行),GID index 通过findBestGidIndex()(530行) 自动选择 - QP 号码:
qp_list_[i] = ibv_create_qp(context_.pd(), &attr);(81行),硬件分配
验证通过。
9.4 RTR 到 RTS 之间无数据探测
文件: mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp:819-835
RTR → RTS 转换只有 ibv_modify_qp 调用,设置 timeout、retry_cnt、rnr_retry、sq_psn、max_rd_atomic 等本地属性。代码路径中没有 ibv_post_send 或任何网络 I/O。验证通过。
9.5 RPC 框架为原始 TCP Socket + JSON
文件: mooncake-transfer-engine/src/transfer_metadata_plugin.cpp:616-1007
SocketHandShakePlugin 类实现了 HandShakePlugin 接口,使用 socket()/connect()/accept() + Json::FastWriter/parseJsonString() 完成通信。不依赖任何 RPC 框架。验证通过。
9.6 消息协议 wire format
文件: mooncake-transfer-engine/include/common.h:365-453
writeString()(365-381行):先写 8 字节uint64_t长度,再写 1 字节uint8_t类型(新协议),最后写 JSON。readString()(418-452行):先读 8 字节长度,再读length字节数据,通过首字节判断新旧协议并提取类型和 JSON。HandShakeRequestType(59-65行):Connection=0, Metadata=1, Notify=2, Probe=3, OldProtocol=0xff。
验证通过。
9.7 MTU 协商逻辑
文件: mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp:784-786
attr.path_mtu = context_.activeMTU();
if (globalConfig().mtu_length < attr.path_mtu)
attr.path_mtu = globalConfig().mtu_length;
取本地端口 active MTU 和配置 MTU 的较小值。验证通过。
9.8 INIT → RTR 失败的错误信息
文件: mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp:811-816
错误信息为 "Failed to modify QP to RTR, check mtu, gid, peer lid, peer qp num",附带 strerror(errno)。验证通过。
9.9 eRDMA 特殊处理
文件: mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp:544-562
int RdmaEndPoint::resetConnection(const std::string &reason) {
#ifdef CONFIG_ERDMA
int ret = reconstruct(); // 销毁并重建 QP
#else
int ret = disconnectUnlocked(); // 仅将 QP 回到 RESET
#endif
}
阿里云 eRDMA 的 QP 一旦进入 RTS 后不能直接重新回到 RTS,需要 reconstruct() 销毁并重新创建 QP。验证通过。
总结
Mooncake Transfer Engine 的握手机制是一个设计精良的分层系统:
- HandShakeDesc 作为握手的统一数据格式,通过条件编译支持多种后端
- LID/GID/QP 号的获取完全依赖标准 RDMA 硬件接口,分发通过 TCP+JSON RPC 完成
- RESET→INIT→RTR→RTS 四步状态机严格遵循 InfiniBand Verbs 规范
- RTR→RTS 之间无数据探测,是纯本地硬件操作
- 错误诊断通过 errno + strerror 传递到对端,日志使用
[Handshake]标签便于排查 - RPC 框架是极简的 TCP socket + JSON,零依赖,适合嵌入式传输库场景
修订说明
修订日期: 2026-05-26
本文基于 review notes 进行了以下修正:
错误修正(3处)
-
7.2 节 wire format 修正:原错误描述为
[4字节类型][4字节长度][N字节JSON],实际格式为[8字节长度 (uint64_t)][1字节类型 (uint8_t)][N字节JSON]。长度字段排在类型之前,长度为 8 字节而非 4 字节,类型为 1 字节而非 4 字节。已根据common.h:365-453的writeString/readString实现修正,并补充了OldProtocol无类型字节的说明。 -
7.2 节 OldProtocol 枚举值修正:原错误写为
OldProtocol = 0,实际值为OldProtocol = 0xff。Connection = 0与OldProtocol = 0xff是完全不同的协议模式——Connection使用带类型字节的新协议格式,而OldProtocol是旧协议降级路径,不带类型字节。已根据common.h:59-65修正。 -
4.5 节 timeout 单位修正:原表中”含义”列写成”约 67 秒”,正确应为”约 67 毫秒”(4.096us × 2^14 = 67ms)。表格中”默认值”列已有正确的 “67ms” 标注,与含义列矛盾,现已统一修正。
表述精确化(4处)
-
2 节 HandShakeDesc 字段顺序:原代码块将
#ifdef USE_BAREX和#ifdef USE_UB放在reply_msg之后,实际源码中这两个块位于peer_nic_path和qp_num之间。已根据transfer_metadata.h:119-135的实际字段排列修正代码块,并添加了字段布局说明。 -
4.2 节 RESET 幂等性:原描述”这是幂等操作”不准确。InfiniBand 规范中 RESET→RESET 是无效状态转换,某些驱动实现在 QP 已处于 RESET 时调用可能失败。已在原文补充说明。
-
6.1 节 errno 对照表:原表以确定性方式列出 errno 与失败原因的对应关系,但实际上
ibv_modify_qp返回的 errno 值完全取决于驱动实现,不同驱动可能对同一场景返回不同 errno(例如 mlx5 驱动下多种失败均返回EINVAL)。已将表格标注为”驱动可能返回的 errno(仅供参考,非标准保证)“,并添加了说明。 -
8 节 EFA 代码清单:原代码块标注为
efa_endpoint.cpp:42的逐字源码,实际是省略了 loopback 检测、NIC 路径解析、错误处理等逻辑的简化版。已在代码块上方添加”简化流程”标注,并在下方补充说明实际源码包含的额外逻辑。