Skip to content
团子云技术 Lite 1.048596
Go back

Mooncake TE 阅读手记-05-最小 C++ 示例全解

团团虾导读:从代码出发的实战笔记。以一个完整的 minimal_example.cpp 为起点,逐行走读 target 节点(注册内存→等待 RDMA 操作)和 initiator 节点(打开远端 Segment→构造 TransferRequest→submit→轮询完成)的完整流程。关键洞察:initiator 也需要 registerLocalMemory,因为 RDMA 操作同时需要两端的 lkey 和 rkey。

第一篇:最小 C++ 示例全解

这是一个端到端、可直接编译运行的 C++ 示例,展示了一个 target 节点和一个 initiator 节点之间的数据传输。source 文件位于:

mooncake-transfer-engine/example/minimal_example.cpp

完整代码

// Minimal Mooncake Transfer Engine example
//
// Build: add to mooncake-transfer-engine/example/CMakeLists.txt:
//   add_executable(minimal_example ${WORKSPACE}/minimal_example.cpp)
//   target_link_libraries(minimal_example PUBLIC transfer_engine)
//
// Usage:
//   Target node:  ./minimal_example --mode=target
//   Initiator node: ./minimal_example --mode=initiator --segment_id=<target_hostname>

#include <gflags/gflags.h>
#include <glog/logging.h>

#include <cstdlib>
#include <memory>
#include <thread>

#include "common.h"
#include "transfer_engine.h"
#include "transport/transport.h"

DEFINE_string(mode, "target", "Mode: target | initiator");
DEFINE_string(metadata_server, "192.168.3.77:2379", "Metadata server address");
DEFINE_string(local_server_name, mooncake::getHostname(), "Local server name");
DEFINE_string(protocol, "tcp", "Transport protocol: rdma | tcp");
DEFINE_string(device_name, "", "Device name (for RDMA)");
DEFINE_string(segment_id, "", "Segment ID of the target node (initiator only)");
DEFINE_uint64(buffer_size, 1ULL << 30, "Buffer size (1GB default)");

using namespace mooncake;

static const size_t kBlockSize = 65536;

int runTarget() {
    auto engine = std::make_unique<TransferEngine>(false);

    auto [host, port] = parseHostNameWithPort(FLAGS_local_server_name);
    engine->init(FLAGS_metadata_server, FLAGS_local_server_name,
                 host.c_str(), port);

    // Install transport
    if (FLAGS_protocol == "tcp") {
        engine->installTransport("tcp", nullptr);
    } else if (FLAGS_protocol == "rdma") {
        std::string matrix = "{\"cpu:0\": [[\"" + FLAGS_device_name +
                             "\"], []], \"cpu:1\": [[\"" + FLAGS_device_name +
                             "\"], []]}";
        void** args = (void**)malloc(2 * sizeof(void*));
        args[0] = (void*)matrix.c_str();
        args[1] = nullptr;
        engine->installTransport("rdma", args);
        free(args);
    }

    // Allocate and register memory
    void* buffer = numa_alloc_onnode(FLAGS_buffer_size, 0);
    int rc = engine->registerLocalMemory(buffer, FLAGS_buffer_size, "cpu:0");
    LOG_ASSERT(!rc);

    LOG(INFO) << "Target ready. Segment: " << FLAGS_local_server_name
              << ", buffer: " << buffer << ", size: " << FLAGS_buffer_size;

    while (true) std::this_thread::sleep_for(std::chrono::seconds(1));

    engine->unregisterLocalMemory(buffer);
    numa_free(buffer, FLAGS_buffer_size);
    return 0;
}

int runInitiator() {
    LOG_ASSERT(!FLAGS_segment_id.empty())
        << "--segment_id is required for initiator mode";

    auto engine = std::make_unique<TransferEngine>(false);

    auto [host, port] = parseHostNameWithPort(FLAGS_local_server_name);
    engine->init(FLAGS_metadata_server, FLAGS_local_server_name,
                 host.c_str(), port);

    // Install transport (same as target)
    if (FLAGS_protocol == "tcp") {
        engine->installTransport("tcp", nullptr);
    } else if (FLAGS_protocol == "rdma") {
        std::string matrix = "{\"cpu:0\": [[\"" + FLAGS_device_name +
                             "\"], []], \"cpu:1\": [[\"" + FLAGS_device_name +
                             "\"], []]}";
        void** args = (void**)malloc(2 * sizeof(void*));
        args[0] = (void*)matrix.c_str();
        args[1] = nullptr;
        engine->installTransport("rdma", args);
        free(args);
    }

    // Allocate and register local memory
    void* buffer = numa_alloc_onnode(FLAGS_buffer_size, 0);
    int rc = engine->registerLocalMemory(buffer, FLAGS_buffer_size, "cpu:0");
    LOG_ASSERT(!rc);

    // Open remote segment
    SegmentID segment_id = engine->openSegment(FLAGS_segment_id);
    LOG(INFO) << "Opened segment: " << FLAGS_segment_id
              << ", segment_id: " << segment_id;

    // Get remote buffer address from segment metadata
    auto seg_desc = engine->getMetadata()->getSegmentDescByID(segment_id);
    LOG_ASSERT(seg_desc && !seg_desc->buffers.empty())
        << "No buffers found in remote segment";
    uint64_t remote_addr = (uint64_t)seg_desc->buffers[0].addr;

    // Allocate a batch and submit a single WRITE transfer
    BatchID batch_id = engine->allocateBatchID(1);

    TransferRequest req;
    req.opcode = TransferRequest::WRITE;
    req.source = buffer;
    req.target_id = segment_id;
    req.target_offset = remote_addr;
    req.length = kBlockSize;

    Status s = engine->submitTransfer(batch_id, {req});
    LOG_ASSERT(s.ok()) << "submitTransfer failed: " << s.ToString();

    // Wait for completion
    TransferStatus status;
    while (true) {
        s = engine->getTransferStatus(batch_id, 0, status);
        LOG_ASSERT(s.ok());
        if (status.s == TransferStatusEnum::COMPLETED) break;
        if (status.s == TransferStatusEnum::FAILED)
            LOG(FATAL) << "Transfer failed";
    }

    LOG(INFO) << "Transfer completed: " << status.transferred_bytes << " bytes";

    engine->freeBatchID(batch_id);
    engine->unregisterLocalMemory(buffer);
    numa_free(buffer, FLAGS_buffer_size);
    return 0;
}

int main(int argc, char** argv) {
    gflags::ParseCommandLineFlags(&argc, &argv, false);

    if (FLAGS_mode == "target")
        return runTarget();
    else if (FLAGS_mode == "initiator")
        return runInitiator();

    LOG(FATAL) << "Unknown mode: " << FLAGS_mode;
}

代码流程分析

整个代码的核心流程分两个角色:TargetInitiator

Target 节点的步骤:

  1. TransferEngine(false) — 创建引擎实例(false 表示不自动发现拓扑)
  2. engine->init(...) — 连接 etcd 元数据服务器,初始化本地 RPC 端点,注册到元数据中心
  3. engine->installTransport("rdma", ...) — 安装 RDMA 传输层,指定 HCA 设备与 NUMA 拓扑的绑定矩阵
  4. engine->registerLocalMemory(buffer, FLAGS_buffer_size, "cpu:0") — 分配 NUMA 节点 0 上的内存,注册到 RNIC,并将包含 addr、rkey、lkey 的 BufferDesc 写入 etcd
  5. 进入等待循环,接受远端的 RDMA 操作

Initiator 节点的步骤:

1-4 与 target 相同(初始化、安装传输、注册本地内存) 5. engine->openSegment(FLAGS_segment_id) — 从 etcd 获取远端节点的 SegmentDesc 并分配本地 segment ID 6. engine->getMetadata()->getSegmentDescByID(segment_id) — 获取远端节点的完整元数据(含 buffers[].addr, rkey 等) 7. 构造 TransferRequest,执行 submitTransfer,轮询等待完成

关键点:initiator 也需要注册本地内存。原因是 RDMA 操作需要 source buffer 的 lkey(本地内存密钥),target 的 registerLocalMemory 只提供 rkey(远端内存密钥)——两者缺一不可。



Share this post on:

Previous Post
Mooncake TE 阅读手记-06-Segment 与元数据发现
Next Post
Mooncake TE 阅读手记-04-Mooncake Store 分层 KV 存储