团团虾导读:进入 TE 的内部调度机制。从 TransferRequest → Batch → TransferTask → Slice 的四层结构讲起,然后分别拆解 RDMA Transport(WorkerPool + RdmaContext + RdmaEndPoint)和 TCP Transport(asio 异步模型 + ServerSession)的线程模型。关键对比:RDMA 走轮询 + shard 队列,TCP 走 asio 事件回调。
文章二:Transport 内部核心概念与线程模型
RDMA 硬核原语:QP、MR、CQ、Endpoint
Transport 层封装了 RDMA 编程模型的核心概念。在 Mooncake 的实现中,这些概念分布在以下类中:
MR — Memory Region(内存区域)
一块注册到 RDMA 网卡的内存。注册后操作系统会锁定(pin)这块内存防止被 swap 出去,同时 RNIC 分配 lkey(local key,本地操作使用)和 rkey(remote key,远端访问需要)。
代码位置:mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h:56-61 定义了 MemoryRegionMeta 结构体:
struct MemoryRegionMeta {
void *addr; // 注册内存的起始地址
struct ibv_mr *mr; // libibverbs 的 Memory Region 对象
};
注册流程:mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp:183-304 的 registerLocalMemoryInternal() 方法中,对每个 RDMA Context(每个 NIC 一个),调用 context->registerMemoryRegion() 将内存注册到对应网卡,然后收集所有 context 的 lkey 和 rkey 存入 BufferDesc。
QP — Queue Pair(队列对)
RDMA 通信的基本单元。每个 QP 包含 Send Queue(SQ)和 Receive Queue(RQ),以及一个 Completion Queue(CQ)。Mooncake 中,QP 被 Endpoint 管理。
代码位置:mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h 中 RdmaEndPoint 类包含 qp_list_(std::vector<ibv_qp *> 类型),一个 Endpoint 可以有多个 QP。
QP 数量配置:globalConfig().num_qp_per_ep(可配置),默认值为 2(config.h:43)。
CQ — Completion Queue(完成队列)
QP 上的操作完成后,Work Completion(WC)会放入 CQ。Worker 线程通过轮询 CQ 获取完成状态。
代码位置:mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h:50-54 定义了 RdmaCq:
struct RdmaCq {
ibv_cq *native; // libibverbs CQ 对象
volatile int outstanding; // 未完成操作计数
};
Worker 线程通过 ibv_poll_cq() 轮询 CQ,提取 WC,根据 WC 的 wr_id(即 Slice 指针)标记 Slice 完成。
Endpoint(端点)
表示本地 NIC 到远端 NIC 的一条”连接”。一个 Endpoint 管理多个 QP、负责握手(HandShake)、重连等。Endpoint 通过 EndpointStore 缓存,避免频繁创建销毁。
代码位置:mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h:108 中的 endpoint() 方法从 EndpointStore 获取或创建 Endpoint。
EndpointStore 支持两种淘汰策略:FIFO(先进先出)和 SIEVE(clock with quick demotion,NSDI 24 论文算法),通过 endpoint_store_type 配置选择(默认 SIEVE)。
Retry(重试)
当 RDMA 操作失败(如设备故障、QP 进入错误态),Slice 的 retry_cnt 递增,Worker 将失败的 Slice 重新分发。超过最大重试次数后标记为 FAILED。
代码位置:mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp:325-333:
slice->rdma.retry_cnt++;
if (slice->rdma.retry_cnt >= slice->rdma.max_retry_cnt) {
slice->markFailed();
processed_slice_count_++;
} else {
// 重新入队,等待重试
collective_slice_queue_[thread_id][slice->peer_nic_path]
.push_back(slice);
redispatch_counter_++;
}
最大重试次数由 globalConfig().retry_cnt 控制(默认值 9,config.h:51)。
线程模型:分层设计
TE 的线程模型分为三层:
┌─────────────────────────────────────────────────┐
│ TransferEngine(无工作线程) │
│ TransferEngineImpl(仅 metrics 线程,可选) │
│ MultiTransport(纯同步路由,无线程) │
└──────────────┬──────────────────────────────────┘
│
┌──────────────▼──────────────────────────────────┐
│ Transport 层(每个 Transport 自己管理线程) │
│ │
│ RdmaTransport: │
│ RdmaContext[0] → WorkerPool │
│ transferWorker[0] → performPostSend + │
│ performPollCq │
│ transferWorker[1] │
│ transferWorker[2] │
│ ... (workers_per_ctx 个) │
│ monitorWorker[0] │
│ │
│ TcpTransport: │
│ asio::io_context(共 1 个) │
│ worker 线程(1 个 thread_,运行 io_context.run())│
│ 基于 asio 异步调度 │
└──────────────────────────────────────────────────┘
1. TransferEngine 层 — 无线程
TransferEngine 和 TransferEngineImpl 不创建任何工作线程(除可选的 metrics 上报线程),所有方法都是同步调用的。submitTransfer() 只是把请求分发下去,不阻塞。
2. MultiTransport — 纯路由层
MultiTransport 只有一个职责:根据请求的 target_id 查找远端 Segment 的 protocol 字段,映射到对应的 Transport 实例,然后调用该 Transport 的 submitTransferTask()。
以下为伪代码,展示核心路由逻辑(实际代码见 multi_transport.cpp:107-146):
// 伪代码:展示 MultiTransport 核心路由逻辑
Status MultiTransport::submitTransfer(BatchID batch_id,
const std::vector<TransferRequest>& entries) {
// 按 Transport 分组
std::unordered_map<Transport*, std::vector<Transport::TransferTask*>> submit_tasks;
for (auto& request : entries) {
Transport* transport = nullptr;
selectTransport(request, transport); // 根据 target_id 查 metadata
submit_tasks[transport].push_back(&task);
}
// 分别提交
for (auto& entry : submit_tasks)
entry.first->submitTransferTask(entry.second);
}
3. Transport 层 — 自管理线程
RDMA Transport 的 WorkerPool
创建线程数量:kTransferWorkerCount = globalConfig().workers_per_ctx,默认值 2(config.h:49)。
WorkerPool 为每个 RdmaContext 创建 workers_per_ctx + 1 个线程(多出 1 个是 monitorWorker)。
代码位置:mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp:32-50:
WorkerPool::WorkerPool(RdmaContext &context, int numa_socket_id) {
for (int i = 0; i < kTransferWorkerCount; ++i)
worker_thread_.emplace_back(
std::thread(std::bind(&WorkerPool::transferWorker, this, i)));
worker_thread_.emplace_back(
std::thread(std::bind(&WorkerPool::monitorWorker, this)));
}
transferWorker 工作循环(worker_pool.cpp:391-421):
- 检查是否有新的 Slice 到达(
submitted_slice_count vs processed_slice_count) - 从 8 个 shard queue 中取出 Slice,按 peer_nic_path 分组
- 查找或创建对应 Endpoint
- 调用
endpoint->submitPostSend()将 Slice 提交到 RDMA QP - 调用
performPollCq()轮询 CQ 获取完成事件 - 将失败的 Slice 重新分发(redispatch)
- 如果所有工作都完成,进入 wait 状态(条件变量),最多等 1 秒
monitorWorker 工作循环(worker_pool.cpp:497-526):
- 每 1 秒调用
context_.reclaimEndpoints()清理过期的 Endpoint - 使用 epoll 监听 RDMA async events(QP FATAL、DEVICE FATAL、PORT ACTIVE 等)
- 根据事件类型标记 endpoint/context 的 active 状态
TcpTransport
TcpTransport 基于 asio 异步 I/O 模型。在 install() 方法中(tcp_transport.cpp:555-556)显式创建了一个 std::thread thread_ 工作线程,运行 io_context.run() 事件循环。每个 TcpTransport 实例拥有 1 个 asio::io_context 和 1 个 worker 线程(tcp_transport.cpp:436,476),而非每个 CPU socket 一个。
数据通路:应用线程 → Transport Worker 的线程跨越
应用线程 (用户态) Transport Worker 线程 (内核/硬件近端)
│ │
│ submitTransfer() │
├─ MultiTransport::selectTransport│
├─ RdmaTransport::submitTransferTask│
│ ├─ 创建 Slice │
│ ├─ 选设备 (selectDevice) │
│ ├─ 分组到 context │
│ └─ RdmaContext::submitPostSend│
│ └─ WorkerPool::submitPostSend
│ └─ 入队到 shard queue │
│ (无锁 push, TicketLock 保护)
│ │
│ transferWorker 被唤醒
│ ├─ performPostSend
│ │ └─ endpoint->submitPostSend (ibv_post_send)
│ ├─ performPollCq
│ │ └─ ibv_poll_cq → WC → slice->markSuccess()
│ └─ 标记 TransferTask::success_slice_count
│ │
│ getTransferStatus() │
├─ 读取 task.success_slice_count │ (原子变量, 无需锁)
└─ 判断是否全部完成 ◄────────────┘
关键通信机制:
- 应用线程 → Worker 线程:通过 8 个 shard queue,每个 queue 有独立的 TicketLock,减少竞争
- Worker 线程 → 应用线程:通过 TransferTask 中的原子变量(
success_slice_count,failed_slice_count),不需要锁
代码验证
应用线程在没有 RTE 模型的情况下如何 poll:
mooncake-transfer-engine/example/transfer_engine_bench.cpp:418-432 演示了典型的轮询模式。应用线程在 submitTransfer 之后自主循环调用 getTransferStatus。不依赖任何框架级回调或事件循环。
TransferTask 中的原子变量用于无锁跨线程通信:
mooncake-transfer-engine/include/transport/transport.h:283-314 中 TransferTask 的关键字段全部使用 volatile 修饰:
struct TransferTask {
volatile uint64_t slice_count = 0;
volatile uint64_t success_slice_count = 0;
volatile uint64_t failed_slice_count = 0;
volatile uint64_t transferred_bytes = 0;
volatile bool is_finished = false;
// ... 其他非 volatile 字段(total_bytes, batch_id, transport_ 等)
#ifdef USE_EVENT_DRIVEN_COMPLETION
volatile uint64_t completed_slice_count = 0;
#endif
};
这些变量由 Worker 线程写入(通过 Slice::markSuccess/markFailed,内部使用 __atomic_fetch_add),由应用线程通过 getTransferStatus 读取,形成天然的锁无关生产者-消费者模型。
TcpTransport 基于 asio 异步模型:
TcpTransport(mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h)基于 asio 异步模型,所有 I/O 操作由 asio::io_context 驱动。在 install() 中显式创建了 1 个 std::thread(tcp_transport.cpp:555-556),用于运行 io_context.run() 事件循环。ServerSession(tcp_transport.cpp:63-243)通过 asio 回调链驱动每个 socket 连接上的读-写流程。