团团虾导读:元数据管理是 Mooncake 集群中节点发现和内存发现的基石。TE 支持两种模式:集中式(通过 etcd/Redis/HTTP 做 shared discovery)和去中心化(P2P Handshake 模式,适合简单部署)。这篇以 Segment 注册为例,完整追踪了一个节点从 init → registerLocalMemory → publish to etcd → 远端 openSegment 的全过程。
阅读版本: Mooncake v0.3.10.post2-104-geaf724ab (commit eaf724ab, 2026-05-20)
Mooncake Transfer Engine 元数据管理:以 Segment 注册为例
一、架构概览
Mooncake Transfer Engine 的元数据管理由 TransferMetadata 类统一负责,支持两种运行模式:
| 模式 | 触发方式 | 存储后端 | 适用场景 |
|---|---|---|---|
| 集中式(默认) | 连接字符串以 etcd:///redis:///http:// 开头 | etcd / Redis / HTTP K-V 服务 | 多节点集群,需要中心化发现 |
| 去中心化 | 连接字符串为字面量 P2PHANDSHAKE | 无集中存储,TCP 直连交换元数据 | 简单部署、测试、两节点直连 |
整个元数据系统位于 mooncake-transfer-engine/ 中,核心文件包括:
| 文件 | 职责 |
|---|---|
include/transfer_metadata.h | TransferMetadata + 所有描述符结构体定义 |
src/transfer_metadata.cpp | Segment 增删查、缓存、JSON 编解码 |
include/transfer_metadata_plugin.h | MetadataStoragePlugin 和 HandShakePlugin 抽象接口 |
src/transfer_metadata_plugin.cpp | 三个存储后端(Etcd/Redis/HTTP)+ TCP 握手的实现 |
mooncake-common/etcd/etcd_wrapper.go | Go etcd v3 客户端,编译为 C 共享库供 C++ 调用 |
二、集中式模式:让 etcd 告诉我们每个节点有什么
集中式模式下,所有节点的 Segment 描述符和 RPC 地址写入同一个 K-V 存储。这是一个典型的 share-nothing metadata, shared discovery 设计 —— 每个节点只管理自己的描述符,但所有人都可以从共享存储中发现其他人。
2.1 引擎初始化时注册 RPC 地址入口
TransferEngineImpl::init() 在做传输层初始化之前,会先用 TransferMetadata 在存储集群里登记自己的位置:
引擎启动 (transfer_engine_impl.cpp:77-368):
// 1. 解析 local_server_name(格式 ip:port)
// 2. 构造 RpcMetaDesc(ip + port)
desc.ip_or_host_name = host_name;
desc.rpc_port = port;
// 3. 创建 TransferMetadata,传入 etcd 连接字符串
metadata_ = std::make_shared<TransferMetadata>(metadata_conn_string);
// 4. 发布本节点 RPC 地址到中心存储
metadata_->addRpcMetaEntry(local_server_name_, desc);
// 5. 发现本地拓扑、安装传输插件(RDMA/TCP/NVLink等)
multi_transports_ = std::make_shared<MultiTransport>(metadata_, local_server_name_);
2.2 注册本地内存,将 Segment 描述符写入 etcd
当调用方执行 registerLocalMemory(addr, length, ...) 后,各 Transport 在注册硬件 MR 的同时,会把缓冲区信息追加到本地 Segment 描述符中,并推送到存储后端。
Transport 注册内存 (transfer_engine_impl.cpp:511-533):
for (auto transport : multi_transports_->listTransports()) {
int ret = transport->registerLocalMemory(
addr, length, location, remote_accessible, update_metadata);
if (ret < 0) return ret;
}
元数据层追加 BufferDesc 并更新 (transfer_metadata.cpp:1059-1071):
int TransferMetadata::addLocalMemoryBuffer(const BufferDesc &buffer_desc,
bool update_metadata) {
RWSpinlock::WriteGuard guard(segment_lock_);
auto new_segment_desc = std::make_shared<SegmentDesc>();
auto &segment_desc = segment_id_to_desc_map_[LOCAL_SEGMENT_ID];
*new_segment_desc = *segment_desc; // CoW 拷贝
segment_desc = new_segment_desc;
segment_desc->buffers.push_back(buffer_desc); // 追加新 buffer
// ...
if (update_metadata) return updateLocalSegmentDesc(); // 序列化并写入存储
}
updateLocalSegmentDesc() 调用 updateSegmentDesc(),将整个 SegmentDesc 序列化为 JSON 后调用 storage_plugin_->set(key, json) 写入中心存储。
2.3 Segment 描述符的 Key 格式
Key 的生成逻辑在 getFullMetadataKey() (transfer_metadata.cpp:169-184):
// 默认 key 前缀为 "mooncake/<cluster_id>/"
// <cluster_id> 来自环境变量 MC_METADATA_CLUSTER_ID
// 普通 segment name -> mooncake/<cluster_id>/ram/<name>
// 包含 '/' 的 name -> mooncake/<cluster_id>/<name>
return common_key_prefix_ + "ram/" + segment_name;
RPC 元数据的 key 格式类似:
mooncake/<cluster_id>/rpc_meta/<server_name>
2.4 发现远端 Segment:openSegment 的完整链路
当 openSegment("10.0.0.2:12345") 被调用时,元数据层的流程如下:
TransferEngineImpl (transfer_engine_impl.cpp:453-476):
SegmentHandle TransferEngineImpl::openSegment(const std::string& segment_name) {
// 1. 调用元数据层获取 SegmentID
SegmentID sid = metadata_->getSegmentID(trimmed_segment_name);
return sid;
}
TransferMetadata::getSegmentID (transfer_metadata.cpp:1004-1024):
SegmentID TransferMetadata::getSegmentID(const std::string &segment_name) {
// 1. 先查本地缓存 (name -> id)
{
RWSpinlock::ReadGuard guard(segment_lock_);
if (segment_name_to_id_map_.count(segment_name))
return segment_name_to_id_map_[segment_name];
}
// 2. 缓存未命中,从远程存储获取描述符(可能涉及网络 I/O)
auto segment_desc = this->getSegmentDesc(segment_name);
if (!segment_desc) return -1;
// 3. WriteLock + double-check,分配新 ID,更新缓存
RWSpinlock::WriteGuard guard(segment_lock_);
if (segment_name_to_id_map_.count(segment_name))
return segment_name_to_id_map_[segment_name];
SegmentID id = next_segment_id_.fetch_add(1);
segment_id_to_desc_map_[id] = segment_desc;
segment_name_to_id_map_[segment_name] = id;
return id;
}
getSegmentDesc 中心化路径 (transfer_metadata.cpp:865-901):
// 中心化模式:从存储插件 GET
if (!storage_plugin_->get(getFullMetadataKey(segment_name), peer_json)) {
LOG(WARNING) << "Failed to retrieve segment descriptor";
return nullptr;
}
return decodeSegmentDesc(peer_json, segment_name);
2.5 元数据缓存
TransferMetadata 维护了两级缓存:
- name-to-id 映射
segment_name_to_id_map_(string -> uint64_t) - id-to-descriptor 映射
segment_id_to_desc_map_(uint64_t -> shared_ptr<SegmentDesc>)
两个 map 通过读写锁 segment_lock_ 保护。获取描述符时,默认启用 metacache(可通过 MC_DISABLE_METACACHE 关闭):
std::shared_ptr<SegmentDesc> getSegmentDescByName(
const std::string &segment_name, bool force_update) {
// metacache 开启时直接返回缓存,跳过网络 IO
if (globalConfig().metacache && !force_update) {
RWSpinlock::ReadGuard guard(segment_lock_);
auto iter = segment_name_to_id_map_.find(segment_name);
if (iter != segment_name_to_id_map_.end())
return segment_id_to_desc_map_[iter->second];
}
// ...
}
syncSegmentCache() 可在运行时强制从存储后端刷新缓存,支持全量或指定 Segment 刷新。
三、Segment 描述符的完整结构
Segment 描述符是元数据管理的核心对象,包含了远端节点进行 RDMA/GPU Direct 等操作所需的全部信息。结构体定义在 transfer_metadata.h:
struct SegmentDesc {
std::string name; // 节点名 (ip:port)
std::string protocol; // "rdma"|"tcp"|"nvlink"|"ascend"|"cxl"|"ub"|...
std::vector<DeviceDesc> devices; // RDMA/UB 设备信息
Topology topology; // NIC 优先级矩阵
std::vector<BufferDesc> buffers; // 注册的内存缓冲区列表
std::vector<NVMeoFBufferDesc> nvmeof_buffers; // NVMe-oF 描述符
std::string cxl_name; // CXL 设备名
uint64_t cxl_base_addr; // CXL 基地址
std::string timestamp; // 最后修改时间
RankInfoDesc rank_info; // Ascend NPU 专用
int tcp_data_port; // TCP 数据端口
};
每个 BufferDesc 包含 key、addr、length 和传输协议特定字段(rkey[]、lkey[]、shm_name、tseg[] 等)。描述符按协议类型使用不同的 JSON 编解码逻辑(见 encodeSegmentDesc/decodeSegmentDesc):
- RDMA/EFA: devices + buffers (rkey/lkey) + priority_matrix
- TCP: buffers (addr/length)
- NVLink/HIP/MACA/ubshmem: buffers (addr/length/shm_name) — 编码阶段五者共享同一分支 (transfer_metadata.cpp:423-427)
- UB: devices (eid) + buffers (tseg)
- nvmeof: buffers (file_path/length/local_path_map) — 仅解码阶段有对应分支 (transfer_metadata.cpp:767-777),编码阶段无此路径
- Ascend: devices + buffers + rank_info (rankId/deviceIp/endpoints)
- CXL: cxl_name/cxl_base_addr + buffers (offset/length)
- Multi-protocol: CXL+TCP 或 CXL+RDMA 组合
四、存储后端实现
4.1 EtcdStoragePlugin(默认)
Mooncake 使用 Go etcd v3 client 编译为 C 共享库 的方式集成 etcd,避免了 C++ 直接调用 etcd gRPC API 的复杂性:
- Go 侧 (
mooncake-common/etcd/etcd_wrapper.go): 使用go.etcd.io/etcd/client/v3实现NewEtcdClient、EtcdGetWrapper、EtcdPutWrapper、EtcdDeleteWrapper,通过 CGo 导出,编译为libetcd_wrapper.so - C++ 侧 (
transfer_metadata_plugin.cpp:447-520): 通过libetcd_wrapper.h调用上述 C 函数
Go 侧示例 —— EtcdPutWrapper:
func EtcdPutWrapper(key *C.char, value *C.char, errMsg **C.char) int {
k := C.GoString(key)
v := C.GoString(value)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := globalClient.Put(ctx, k, v)
// ...
}
连接字符串格式:etcd://<host>:<port>,支持多 endpoints(用逗号或分号分隔)。
4.2 RedisStoragePlugin
基于 hiredis 库的直接封装:
构造: redisConnect(hostname, port)
GET: redisCommand(client_, "GET %s", key.c_str())
SET: redisCommand(client_, "SET %s %s", key.c_str(), json_file.c_str())
DEL: redisCommand(client_, "DEL %s", key.c_str())
支持认证:环境变量 MC_REDIS_USERNAME / MC_REDIS_PASSWORD / MC_REDIS_DB_INDEX。
连接字符串格式:redis://<host>:<port>
4.3 HTTPStoragePlugin
通过 libcurl 以 RESTful 方式访问 K-V 存储:
- GET:
GET <url>?key=<escaped_key> - SET:
PUT <url>?key=<escaped_key>+ JSON body - DELETE:
DELETE <url>?key=<escaped_key>
使用 thread_local curl handle 避免多线程竞争。
连接字符串格式:http://<url> 或 https://<url>
4.4 插件选择逻辑
MetadataStoragePlugin::Create() 通过两步确定后端:
- parseConnectionString (transfer_metadata_plugin.cpp:523-540): 解析
://前缀提取协议名。无://的字符串默认视为 etcd(如"10.0.0.1:2379"等价于etcd://10.0.0.1:2379)。 - Create 工厂 (transfer_metadata_plugin.cpp:542-593): 按编译宏匹配已知协议名,匹配成功则创建对应后端。所有已知协议(etcd/redis/http/https)均未匹配时,调用
LOG(FATAL)终止程序,不会回退到 etcd。
具体分发逻辑:
"etcd" -> EtcdStoragePlugin(需 USE_ETCD 编译宏, line 546-548)
"redis" -> RedisStoragePlugin(需 USE_REDIS 编译宏, line 552-578)
"http" -> HTTPStoragePlugin(需 USE_HTTP 编译宏, line 583-586)
"https" -> HTTPStoragePlugin(需 USE_HTTP 编译宏, line 583-586)
以上均未命中 -> LOG(FATAL) "Unable to find metadata storage plugin" (line 590)
五、去中心化 P2P 模式
当连接字符串为字面量 "P2PHANDSHAKE" 时,TransferMetadata 跳过所有中心化存储操作:
if (conn_string == P2PHANDSHAKE) {
p2p_handshake_mode_ = true;
return; // 不创建 storage_plugin_
}
5.1 SocketHandShakePlugin 的设计
这是一个基于 TCP 的异步握手服务:
-
启动: 在随机可用端口上启动 TCP listener 线程
-
消息协议: 长度前缀 + 1 字节消息类型 + JSON payload
-
三种在线消息类型 (common.h:59-66 定义的枚举值 0-3):
Connection = 0—— 握手连接Metadata = 1—— 交换 Segment 描述符Notify = 2—— 异步通知Probe = 3—— 存活探测
外加一个解码回退路径
OldProtocol = 0xff: 这不是真正的线格式类型。当readString收到消息的首字节 > 3 (Probe) 时,识别为旧版本协议(无类型字节),将其标记为 OldProtocol (common.h:443-449)。在服务端处理时,Connection 和 OldProtocol 两个类型走同一个回调 (on_connection_callback_),但线路上编码不同:新协议多一个类型字节,旧协议直接从 payload 开始。
5.2 P2P 模式下的 Segment 发现
openSegment("10.0.0.2:12345")
-> getSegmentID(name)
-> getSegmentDesc(name) [P2P 分支]
-> 解析 name 为 ip + port
-> 编码本地 LOCAL_SEGMENT_ID 描述符为 JSON
-> handshake_plugin_->exchangeMetadata(ip, port, local_json, peer_json)
-> 解码对端返回的 JSON 为 SegmentDesc
-> 分配 ID,缓存,返回
P2P 模式中没有服务发现 —— 调用方必须事先知道对端地址。
六、数据流全景图
应用层:
TransferEngine::init("etcd://10.0.0.1:2379", "10.0.0.1:12345")
TransferEngine::registerLocalMemory(addr, len, ...)
TransferEngine::openSegment("10.0.0.2:12345")
TransferEngineImpl:
├── init() → metadata_->addRpcMetaEntry()
├── registerLocalMemory() → transport->registerLocalMemory()
│ └── 内部调用 metadata_->addLocalMemoryBuffer()
│ └── updateLocalSegmentDesc() → storage_plugin_->set(...)
└── openSegment() → metadata_->getSegmentID()
└── getSegmentDesc() → storage_plugin_->get(...)
└── decodeSegmentDesc() → SegmentDesc → 缓存
TransferMetadata (元数据中枢):
├── 本地缓存: segment_name_to_id_map_ / segment_id_to_desc_map_
├── 集中式: MetadataStoragePlugin (etcd/redis/http)
└── P2P: HandShakePlugin (SocketHandShakePlugin, TCP 直连)
MultiTransport (传输层):
└── selectTransport(segment_desc.protocol) → RdmaTransport/TcpTransport/...
七、关键设计要点
-
多后端可插拔:
MetadataStoragePlugin纯虚接口让 etcd/Redis/HTTP 可以任意切换,新增后端只需实现 get/set/remove 三个方法。 -
CoW 模式更新 Segment 描述符:
addLocalMemoryBuffer使用 Copy-on-Write 方式追加 buffer —— 创建新的shared_ptr<SegmentDesc>拷贝,追加 buffer 后替换旧指针。这样读操作不会被写阻塞。 -
RWSpinlock 保护缓存: 两种 map 的读写使用读写自旋锁,读多写少的场景下性能良好。网络 I/O 操作(
storage_plugin_->get)在锁外执行,避免长时间持锁。 -
原子 ID 分配:
next_segment_id_是std::atomic<uint64_t>,通过fetch_add(1)无锁分配。本地 Segment 固定为LOCAL_SEGMENT_ID = 0。 -
Go-C++ CGo 桥接: etcd 集成通过 Go 编译为 C 共享库实现,CGo 层的内存管理需要注意 ——
EtcdGetWrapper返回的*C.char需要 C++ 侧free()。 -
Metacache 默认开启: 获取 Segment 描述符默认走本地缓存,避免每次
getSegmentDescByName都产生网络 I/O。需要实时性时可以传force_update=true或调用syncSegmentCache()。 -
集群隔离: 通过
MC_METADATA_CLUSTER_ID环境变量为 key 添加自定义前缀,多个 Mooncake 集群可以共享同一个 etcd 实例而不冲突。
八、代码验证
以下是验证上述分析的源码位置汇总:
引擎初始化:
mooncake-transfer-engine/src/transfer_engine_impl.cpp第 77-368 行 ——TransferEngineImpl::init()完整流程- 第 192 行:创建
TransferMetadata;第 202 行:调用addRpcMetaEntry发布 RPC 地址
元数据核心:
mooncake-transfer-engine/include/transfer_metadata.h第 44-234 行 ——TransferMetadata类定义、SegmentDesc/BufferDesc/DeviceDesc结构体mooncake-transfer-engine/src/transfer_metadata.cpp第 130-165 行 —— 构造函数,解析协议前缀,创建插件- 第 169-184 行 ——
getFullMetadataKey()key 格式 - 第 423-427 行 ——
encodeSegmentDesc中 nvlink/nvlink_intra/hip/maca/ubshmem 共用分支 - 第 459-478 行 ——
updateSegmentDesc()序列化并写入存储 - 第 767-777 行 ——
decodeSegmentDesc中 nvmeof 解码分支 - 第 865-901 行 ——
getSegmentDesc()集中式/P2P 分支 - 第 938-974 行 ——
getSegmentDescByName()缓存逻辑 - 第 1004-1024 行 ——
getSegmentID()分配 ID + 缓存 - 第 1059-1101 行 ——
addLocalMemoryBuffer/removeLocalMemoryBufferCoW 操作
存储插件:
mooncake-transfer-engine/include/transfer_metadata_plugin.h第 21-80 行 —— 插件抽象接口mooncake-transfer-engine/src/transfer_metadata_plugin.cpp第 447-520 行 —— EtcdStoragePlugin(CGo 桥接版本)- 第 69-204 行 —— RedisStoragePlugin
- 第 207-391 行 —— HTTPStoragePlugin
- 第 523-540 行 ——
parseConnectionString(无://默认 etcd) - 第 542-593 行 ——
MetadataStoragePlugin::Create()工厂方法(未知协议调用 LOG(FATAL))
Go etcd 桥接:
mooncake-common/etcd/etcd_wrapper.go第 85-127 行 —— TE 用的 etcd client(NewEtcdClient、Get、Put、Delete、Close)
Segment 注册调用链:
mooncake-transfer-engine/src/transfer_engine_impl.cpp第 511-533 行 ——registerLocalMemory()遍历所有 transport 注册- 第 453-476 行 ——
openSegment()入口
P2P 握手:
mooncake-transfer-engine/include/common.h第 59-66 行 ——HandShakeRequestType枚举(Connection=0, Metadata=1, Notify=2, Probe=3, OldProtocol=0xff)- 第 443-449 行 ——
readString中 OldProtocol 解码回退逻辑(首字节 > 3 时触发) mooncake-transfer-engine/src/transfer_metadata_plugin.cpp第 616-1177 行 ——SocketHandShakePlugin完整实现mooncake-transfer-engine/src/transfer_metadata.cpp第 154-158 行 —— P2P 模式判断
九、修订说明
本文档基于审查记录 ./10-review-notes.md 完成修订,共修正 1 处事实错误 和 3 处描述不精确:
-
事实错误修正 — 4.4 节插件选择逻辑:原文称 “其他 -> etcd (默认回退)“,与
MetadataStoragePlugin::Create()(transfer_metadata_plugin.cpp:590) 的实际行为不符。修正为两步描述:parseConnectionString将无://前缀默认视为 etcd;Create工厂对未匹配的已知协议调用LOG(FATAL)终止程序。 -
描述精确化 — 三、SegmentDesc 协议编解码分组:补充了
ubshmem编码分支 (transfer_metadata.cpp:423-427) 和nvmeof解码分支 (transfer_metadata.cpp:767-777),并标注 nvmeof 仅在解码阶段有对应路径。 -
描述精确化 — 5.1 节 P2P 消息类型:原文将
OldProtocol与Connection混列为一种消息类型。修正为三类在线消息类型 (Connection/Metadata/Notify/Probe, 枚举值 0-3) + 一个解码回退路径 (OldProtocol=0xff),并解释readString(common.h:443-449) 中的触发条件和服务端处理差异。 -
行号修正 — 二/八节
init()行号范围:从77-202更正为77-368(函数在 transfer_engine_impl.cpp 中延伸至 368 行)。此外 RedisStoragePlugin 结束行从 205 订正为 204、EtcdStoragePlugin 结束行从 521 订正为 520、etcd_wrapper.go TE 客户端引用范围从 85-198 订正为 85-127。