Skip to content
团子云技术 Lite 1.048596
Go back

Mooncake TE 阅读手记-12-Transport 核心概念与线程模型

团团虾导读:进入 TE 的内部调度机制。从 TransferRequest → Batch → TransferTask → Slice 的四层结构讲起,然后分别拆解 RDMA Transport(WorkerPool + RdmaContext + RdmaEndPoint)和 TCP Transport(asio 异步模型 + ServerSession)的线程模型。关键对比:RDMA 走轮询 + shard 队列,TCP 走 asio 事件回调。

文章二:Transport 内部核心概念与线程模型

RDMA 硬核原语:QP、MR、CQ、Endpoint

Transport 层封装了 RDMA 编程模型的核心概念。在 Mooncake 的实现中,这些概念分布在以下类中:

MR — Memory Region(内存区域)

一块注册到 RDMA 网卡的内存。注册后操作系统会锁定(pin)这块内存防止被 swap 出去,同时 RNIC 分配 lkey(local key,本地操作使用)和 rkey(remote key,远端访问需要)。

代码位置mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h:56-61 定义了 MemoryRegionMeta 结构体:

struct MemoryRegionMeta {
    void *addr;        // 注册内存的起始地址
    struct ibv_mr *mr; // libibverbs 的 Memory Region 对象
};

注册流程mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp:183-304registerLocalMemoryInternal() 方法中,对每个 RDMA Context(每个 NIC 一个),调用 context->registerMemoryRegion() 将内存注册到对应网卡,然后收集所有 context 的 lkey 和 rkey 存入 BufferDesc

QP — Queue Pair(队列对)

RDMA 通信的基本单元。每个 QP 包含 Send Queue(SQ)和 Receive Queue(RQ),以及一个 Completion Queue(CQ)。Mooncake 中,QP 被 Endpoint 管理。

代码位置mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.hRdmaEndPoint 类包含 qp_list_std::vector<ibv_qp *> 类型),一个 Endpoint 可以有多个 QP。

QP 数量配置:globalConfig().num_qp_per_ep(可配置),默认值为 2(config.h:43)。

CQ — Completion Queue(完成队列)

QP 上的操作完成后,Work Completion(WC)会放入 CQ。Worker 线程通过轮询 CQ 获取完成状态。

代码位置mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h:50-54 定义了 RdmaCq

struct RdmaCq {
    ibv_cq *native;           // libibverbs CQ 对象
    volatile int outstanding;  // 未完成操作计数
};

Worker 线程通过 ibv_poll_cq() 轮询 CQ,提取 WC,根据 WC 的 wr_id(即 Slice 指针)标记 Slice 完成。

Endpoint(端点)

表示本地 NIC 到远端 NIC 的一条”连接”。一个 Endpoint 管理多个 QP、负责握手(HandShake)、重连等。Endpoint 通过 EndpointStore 缓存,避免频繁创建销毁。

代码位置mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h:108 中的 endpoint() 方法从 EndpointStore 获取或创建 Endpoint。

EndpointStore 支持两种淘汰策略:FIFO(先进先出)和 SIEVE(clock with quick demotion,NSDI 24 论文算法),通过 endpoint_store_type 配置选择(默认 SIEVE)。

Retry(重试)

当 RDMA 操作失败(如设备故障、QP 进入错误态),Slice 的 retry_cnt 递增,Worker 将失败的 Slice 重新分发。超过最大重试次数后标记为 FAILED。

代码位置mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp:325-333

slice->rdma.retry_cnt++;
if (slice->rdma.retry_cnt >= slice->rdma.max_retry_cnt) {
    slice->markFailed();
    processed_slice_count_++;
} else {
    // 重新入队,等待重试
    collective_slice_queue_[thread_id][slice->peer_nic_path]
        .push_back(slice);
    redispatch_counter_++;
}

最大重试次数由 globalConfig().retry_cnt 控制(默认值 9,config.h:51)。

线程模型:分层设计

TE 的线程模型分为三层:

┌─────────────────────────────────────────────────┐
│ TransferEngine(无工作线程)                       │
│ TransferEngineImpl(仅 metrics 线程,可选)         │
│ MultiTransport(纯同步路由,无线程)                │
└──────────────┬──────────────────────────────────┘

┌──────────────▼──────────────────────────────────┐
│ Transport 层(每个 Transport 自己管理线程)         │
│                                                  │
│  RdmaTransport:                                  │
│    RdmaContext[0] → WorkerPool                   │
│      transferWorker[0] → performPostSend +      │
│                           performPollCq          │
│      transferWorker[1]                          │
│      transferWorker[2]                          │
│      ...               (workers_per_ctx 个)       │
│      monitorWorker[0]                           │
│                                                  │
│  TcpTransport:                                   │
│    asio::io_context(共 1 个)                     │
│    worker 线程(1 个 thread_,运行 io_context.run())│
│    基于 asio 异步调度                             │
└──────────────────────────────────────────────────┘

1. TransferEngine 层 — 无线程

TransferEngineTransferEngineImpl 不创建任何工作线程(除可选的 metrics 上报线程),所有方法都是同步调用的。submitTransfer() 只是把请求分发下去,不阻塞。

2. MultiTransport — 纯路由层

MultiTransport 只有一个职责:根据请求的 target_id 查找远端 Segment 的 protocol 字段,映射到对应的 Transport 实例,然后调用该 Transport 的 submitTransferTask()

以下为伪代码,展示核心路由逻辑(实际代码见 multi_transport.cpp:107-146):

// 伪代码:展示 MultiTransport 核心路由逻辑
Status MultiTransport::submitTransfer(BatchID batch_id,
                                       const std::vector<TransferRequest>& entries) {
    // 按 Transport 分组
    std::unordered_map<Transport*, std::vector<Transport::TransferTask*>> submit_tasks;
    for (auto& request : entries) {
        Transport* transport = nullptr;
        selectTransport(request, transport);  // 根据 target_id 查 metadata
        submit_tasks[transport].push_back(&task);
    }
    // 分别提交
    for (auto& entry : submit_tasks)
        entry.first->submitTransferTask(entry.second);
}

3. Transport 层 — 自管理线程

RDMA Transport 的 WorkerPool

创建线程数量kTransferWorkerCount = globalConfig().workers_per_ctx,默认值 2(config.h:49)。

WorkerPool 为每个 RdmaContext 创建 workers_per_ctx + 1 个线程(多出 1 个是 monitorWorker)。

代码位置mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp:32-50

WorkerPool::WorkerPool(RdmaContext &context, int numa_socket_id) {
    for (int i = 0; i < kTransferWorkerCount; ++i)
        worker_thread_.emplace_back(
            std::thread(std::bind(&WorkerPool::transferWorker, this, i)));
    worker_thread_.emplace_back(
        std::thread(std::bind(&WorkerPool::monitorWorker, this)));
}

transferWorker 工作循环worker_pool.cpp:391-421):

  1. 检查是否有新的 Slice 到达(submitted_slice_count vs processed_slice_count
  2. 从 8 个 shard queue 中取出 Slice,按 peer_nic_path 分组
  3. 查找或创建对应 Endpoint
  4. 调用 endpoint->submitPostSend() 将 Slice 提交到 RDMA QP
  5. 调用 performPollCq() 轮询 CQ 获取完成事件
  6. 将失败的 Slice 重新分发(redispatch)
  7. 如果所有工作都完成,进入 wait 状态(条件变量),最多等 1 秒

monitorWorker 工作循环worker_pool.cpp:497-526):

  1. 每 1 秒调用 context_.reclaimEndpoints() 清理过期的 Endpoint
  2. 使用 epoll 监听 RDMA async events(QP FATAL、DEVICE FATAL、PORT ACTIVE 等)
  3. 根据事件类型标记 endpoint/context 的 active 状态

TcpTransport

TcpTransport 基于 asio 异步 I/O 模型。在 install() 方法中(tcp_transport.cpp:555-556)显式创建了一个 std::thread thread_ 工作线程,运行 io_context.run() 事件循环。每个 TcpTransport 实例拥有 1 个 asio::io_context 和 1 个 worker 线程(tcp_transport.cpp:436,476),而非每个 CPU socket 一个。

数据通路:应用线程 → Transport Worker 的线程跨越

应用线程 (用户态)                  Transport Worker 线程 (内核/硬件近端)
       │                                │
       │ submitTransfer()               │
       ├─ MultiTransport::selectTransport│
       ├─ RdmaTransport::submitTransferTask│
       │   ├─ 创建 Slice                 │
       │   ├─ 选设备 (selectDevice)      │
       │   ├─ 分组到 context            │
       │   └─ RdmaContext::submitPostSend│
       │       └─ WorkerPool::submitPostSend
       │           └─ 入队到 shard queue │
       │              (无锁 push, TicketLock 保护)
       │                                │
       │                      transferWorker 被唤醒
       │                         ├─ performPostSend
       │                         │   └─ endpoint->submitPostSend (ibv_post_send)
       │                         ├─ performPollCq
       │                         │   └─ ibv_poll_cq → WC → slice->markSuccess()
       │                         └─ 标记 TransferTask::success_slice_count
       │                                │
       │ getTransferStatus()            │
       ├─ 读取 task.success_slice_count │ (原子变量, 无需锁)
       └─ 判断是否全部完成 ◄────────────┘

关键通信机制:

代码验证

应用线程在没有 RTE 模型的情况下如何 poll

mooncake-transfer-engine/example/transfer_engine_bench.cpp:418-432 演示了典型的轮询模式。应用线程在 submitTransfer 之后自主循环调用 getTransferStatus。不依赖任何框架级回调或事件循环。

TransferTask 中的原子变量用于无锁跨线程通信

mooncake-transfer-engine/include/transport/transport.h:283-314TransferTask 的关键字段全部使用 volatile 修饰:

struct TransferTask {
    volatile uint64_t slice_count = 0;
    volatile uint64_t success_slice_count = 0;
    volatile uint64_t failed_slice_count = 0;
    volatile uint64_t transferred_bytes = 0;
    volatile bool is_finished = false;
    // ... 其他非 volatile 字段(total_bytes, batch_id, transport_ 等)
#ifdef USE_EVENT_DRIVEN_COMPLETION
    volatile uint64_t completed_slice_count = 0;
#endif
};

这些变量由 Worker 线程写入(通过 Slice::markSuccess/markFailed,内部使用 __atomic_fetch_add),由应用线程通过 getTransferStatus 读取,形成天然的锁无关生产者-消费者模型。

TcpTransport 基于 asio 异步模型

TcpTransport(mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h)基于 asio 异步模型,所有 I/O 操作由 asio::io_context 驱动。在 install() 中显式创建了 1 个 std::threadtcp_transport.cpp:555-556),用于运行 io_context.run() 事件循环。ServerSession(tcp_transport.cpp:63-243)通过 asio 回调链驱动每个 socket 连接上的读-写流程。



Share this post on:

Previous Post
Mooncake TE 阅读手记-13-高性能编程线程模型
Next Post
Mooncake TE 阅读手记-11-TE 接口设计