Skip to content
团子云技术 Lite 1.048596
Go back

Mooncake TE 阅读手记-10-QP Depth 与 Slice 处理

团团虾导读:LLM 推理的 KV Cache 传输动辄几百 MB,如果一次性提交会造成 QP 溢出。Mooncake 的解法是把大请求按 64KB 切片,配合两级反压(QP Depth + CQ 水位)控制提交节奏。这篇从 Slice 结构体讲起,拆解了旧路径和 TENT 新路径两套调度机制。

阅读版本: Mooncake v0.3.10.post2-104-geaf724ab (commit eaf724ab, 2026-05-20)

Mooncake QP Depth 与 RDMA Slice 处理机制

一、为什么需要 QP Depth

RDMA 传输的基本单位是 Work Request (WR)。上层提交 WR 到 QP (Queue Pair) 的 Send Queue,硬件异步取出并执行,完成后往 CQ (Completion Queue) 写入 WC (Work Completion)。如果提交速度超过网卡消化速度,WR 会在 QP 里堆积直至溢出 — 这就是 QP Depth 要解决的问题。

Mooncake 需要在高吞吐的 KV Cache 传输场景下保证:

为此,Mooncake 设计了一套两级反压 + 指针追踪的 QP Depth 机制。

二、Slice - 传输的基本调度单元

在大模型 KV Cache 传输场景中,一个 TransferRequest 往往携带几百 KB 到几 MB 的数据。Mooncake 会把一个大块请求按 slice_size(默认 65536 字节,见 config.h:50,在 tent 内部通过 params_->workers.block_size 使用)切分成多个 Slice,每个 Slice 对应一个 RDMA WR。

2.1 旧传输路径(Transport::Slice)

文件:mooncake-transfer-engine/include/transport/transport.h

struct Slice {
    void *source_addr;          // 本地数据地址
    size_t length;              // 本次传输字节数
    TransferRequest::OpCode opcode;   // READ or WRITE
    SegmentID target_id;        // 远端 Segment ID
    std::string peer_nic_path;  // 远端 NIC 路径
    SliceStatus status;         // 当前状态 (PENDING/POSTED/SUCCESS/TIMEOUT/FAILED)
    TransferTask *task;         // 所属传输任务
    std::vector<uint32_t> dest_rkeys;  // 远端 rkey 列表
    bool from_cache;            // 是否来自 Slice 缓存池

    union {
        struct {
            uint64_t dest_addr;       // 远端目标地址 = target_offset + offset
            uint32_t source_lkey;     // 本地内存 lkey
            uint32_t dest_rkey;       // 远端内存 rkey
            int lkey_index;           // lkey 在 MR 列表中的索引
            int rkey_index;           // rkey 在 MR 列表中的索引
            volatile int *qp_depth;   // ★ 指向 wr_depth_list_[qp_index] 的指针
            uint32_t retry_cnt;       // 当前重试次数
            uint32_t max_retry_cnt;   // 最大重试次数
        } rdma;
        // ... 其他传输协议
    };
};

注意:以上展示了 RDMA 传输相关的所有字段,union 中省略了 TCP、NVMe-oF、CXL 等其他传输协议分支结构。完整定义见 transport.h:104-165

关键字段解析:

2.2 新传输路径(tent::RdmaSlice)

文件:mooncake-transfer-engine/tent/include/tent/transport/rdma/slice.h

struct RdmaSlice {
    void* source_addr = nullptr;      // 本地数据地址
    uint64_t target_addr = 0;         // 远端目标地址
    size_t length = 0;                // 传输长度

    RdmaTask* task = nullptr;         // 所属任务
    RdmaSlice* next = nullptr;        // 链表指针(切片链)

    uint32_t source_lkey = 0;         // 本地 lkey
    uint32_t target_rkey = 0;         // 远端 rkey
    int source_dev_id = -1;           // 本地 device ID
    int target_dev_id = -1;           // 远端 device ID

    std::weak_ptr<RdmaEndPoint> ep_weak_ptr;  // 所属 endpoint 弱引用
    TransferStatusEnum word = INITIAL;        // 完成状态
    int qp_index = 0;                // 绑定的 QP 索引
    int retry_count = 0;             // 重试计数
    bool failed = false;             // 提交失败标记
    uint64_t enqueue_ts = 0;         // 入队时间戳
    uint64_t submit_ts = 0;          // 提交时间戳
    RailMonitor* rail_monitor = nullptr;  // 设备对健康监控器
};

新传输路径的 Slice 在前者基础上做了几项改进:

2.3 Slice 的创建流程

在 tent 传输中(rdma_transport.cpp:318-413),Slice 的创建遵循以下过程:

1. 对每个 TransferRequest
   ├── 计算切片数: num_slices = min(max(1, length/block_size), max_slice_count)
   ├── 尾块合并: 若尾部 < block_size * 0.25,合并到最后一块
   ├── 实际块大小: block_size = roundup(length/num_slices, default_block_size)
   └── 循环创建每个 Slice:
       ├── source_addr = request.source + offset
       ├── target_addr = request.target_offset + offset
       ├── length = min(remaining, block_size)
       ├── task 指针关联
       ├── retry_count = 0
       └── 插入到 worker 对应的 Slice 链表中
2. 将所有 Slice 链表提交到 Workers

三、QP Depth 的两级反压体系

3.1 第一级 — QP 级别

每个 QP 有一个 wr_depth_list_[qp_index] 计数器,记录该 QP 上当前有多少个已提交但未完成的 WR。

提交前检查:

int qp_avail = max_wr_depth_ - wr_depth_list_[qp_index];
if (qp_avail <= 0) continue;  // QP 已满,跳过

这一级防止单个 QP 被过度提交导致 ibv_post_send 失败。

3.2 第二级 — CQ 级别

每个 CQ 有一个 cq_outstanding_ 计数器,记录该 CQ 上所有 QP 的未完成 WR 总数。

提交前检查:

int cq_remaining = max_cqe - *cq_outstanding_;
// ...
wr_count = min(wr_count, cq_remaining);

这一级防止 CQ 溢出导致 WC 丢失。这是硬件硬限制 — ibv_create_cq 时创建的 CQE 数量是固定的。

3.3 提交时的分配策略

在旧路径 submitPostSendrdma_endpoint.cpp:578-661)中,Slice 被按比例分配到各个 QP:

// 将剩余 slices 均匀分配到剩余的 QP 上
size_t chunk = (remaining_slices + remaining_qps - 1) / remaining_qps;
int wr_count = min(assigned_count, qp_avail);  // 再受 QP 可用度约束
wr_count = min(wr_count, cq_remaining);         // 再受 CQ 可用度约束

// 提交
__sync_fetch_and_add(&wr_depth_list_[qp_index], wr_count);
__sync_fetch_and_add(cq_outstanding_, wr_count);
ibv_post_send(qp_list_[qp_index], wr_list.data(), &bad_wr);

新路径 submitSlicestent endpoint.cpp:604-673)更简洁:

int wr_count = min(
    cq->maxCqe() - cq->getQuota(),          // CQ 剩余空间
    min(params_->max_qp_wr - wr_depth_list_[qp_index].value,  // QP 剩余空间
        (int)slice_list.size())             // 待提交的 slice 数量
);

reserveQuota(qp_index, wr_count);  // 原子增加 wr_depth 和 CQ quota
ibv_post_send(qp_list_[qp_index], wr_list.data(), &bad_wr);

新路径的 reserveQuotaendpoint.cpp:732-743)同时维护 QP 深度和 CQ 配额:

bool RdmaEndPoint::reserveQuota(int qp_index, int num_entries) {
    auto cq = context_->cq(qp_index % context_->cqCount());
    if (!cq->reserveQuota(num_entries)) return false;  // CQ 配额不足
    auto prev = __sync_fetch_and_add(&wr_depth_list_[qp_index].value, num_entries);
    __sync_fetch_and_add(&inflight_slices_, num_entries);
    if (prev + num_entries > params_->max_qp_wr) {
        cancelQuota(qp_index, num_entries);  // 超额了,回退
        return false;
    }
    return true;
}

四、“回邮地址” — QP Depth 指针追踪

这是 Mooncake 最精妙的设计之一。

4.1 提交侧:在 Slice 里埋下指针

// 老路径
slice->rdma.qp_depth = &wr_depth_list_[qp_index];

// 新路径(隐式)
// slice->qp_index 记录 QP 编号,acknowledge 时通过 qp_index 找到 wr_depth_list_

4.2 完成侧:通过 Slice 指针找到对应计数器

RDMA 硬件完成传输后,往 CQ 写一条 WC,其中 wr_id 就是提交时填入的 Slice* 指针。轮询线程拿到 Slice 后:

老路径worker_pool.cpp:269-349)采用分组批量减:

std::unordered_map<volatile int*, int> qp_depth_set;

for each completion WC:
    Slice* slice = (Slice*)wc[i].wr_id;
    qp_depth_set[slice->rdma.qp_depth]++;  // 按 QP 指针分组计数

// 每个 QP 只做一次原子减法(而非每个 slice 做一次)
for (auto& entry : qp_depth_set)
    __sync_fetch_and_sub(entry.first, entry.second);

源码中第 289 行有一个被注释掉的替代方案:

// __sync_fetch_and_sub(slice->rdma.qp_depth, 1);

这说明作者做过权衡 — 逐片减法会产生大量原子竞争,而分组后每个 QP 只做一次原子操作,在高吞吐场景下性能差异显著。

新路径tent workers.cpp:286-381)在 acknowledge 时处理:

// endpoint::acknowledge()
size_t RdmaEndPoint::acknowledge(RdmaSlice* slice, TransferStatusEnum status) {
    auto qp_index = slice->qp_index;
    auto& queue = slice_queue_[qp_index];
    // 从 BoundedSliceQueue 中按序弹出,直到找到目标 slice
    do {
        current = queue.pop();
        updateSliceStatus(current, status);
    } while (current != slice);
    cancelQuota(qp_index, num_entries);  // 原子减法
}

这里 cancelQuota 同时减少 wr_depth_list_ 和 CQ quota:

void RdmaEndPoint::cancelQuota(int qp_index, int num_entries) {
    __sync_fetch_and_sub(&wr_depth_list_[qp_index].value, num_entries);
    __sync_fetch_and_sub(&inflight_slices_, num_entries);
    cq->cancelQuota(num_entries);
}

4.3 可视化:指针归因流程

提交线程                              硬件 RDMA                              轮询线程
                                                                                 
获取 QP#2 的可用深度                        |                                      |
qp_avail = 256 - wr_depth_list_[2]          |                                      |
= 200(可提交 5 个)                        |                                      |
                                           |                                      |
slice_list[0].rdma.qp_depth =              |                                      |
    &wr_depth_list_[2];  ← 埋指针            |                                      |
                                           |                                      |
__sync_fetch_and_add(                      |                                      |
    &wr_depth_list_[2], 5)                 |                                      |
    → wr_depth_list_[2] = 205             |                                      |
                                           |                                      |
ibv_post_send(QP#2, wr_list, ...) ──────────→ 网卡开始异步执行                     |
                                           |                                      |
                                           |   ... RDMA 传输中 ...                |
                                           |                                      |
                                           ←── ←── WC 写入 CQ                      poll CQ
                                           |        (wr_id = Slice*)              ↓
                                           |                               Slice* s = (Slice*)wc.wr_id;
                                           |                               s->rdma.qp_depth
                                           |                                  ↓
                                           |                               确定是 wr_depth_list_[2]
                                           |                               qp_depth_set[ptr]++
                                           |                                      |
                                           |                               __sync_fetch_and_sub(
                                           |                                   &wr_depth_list_[2], 5)
                                           |                                   → wr_depth_list_[2] = 200

五、Slice 的完整生命周期

以 tent 新传输路径为例,一个 Slice 从创建到完成经历以下阶段:

阶段 1:分配与拆分 (submitTransferTasks)

rdma_transport.cpp:374-404

阶段 2:路径解析 (generatePostPath)

workers.cpp:680-701

阶段 3:分组与端点获取 (asyncPostSend)

workers.cpp:212-283

阶段 4:提交到硬件 (submitSlices)

endpoint.cpp:604-673

阶段 5:轮询完成 (asyncPollCq)

workers.cpp:286-381

阶段 6:状态更新 (updateSliceStatus)

slice.h:101-121

static inline void updateSliceStatus(RdmaSlice* slice, TransferStatusEnum status) {
    // CAS 设置 slice 状态
    if (!__sync_bool_compare_and_swap(&slice->word, PENDING, status)) return;

    if (status == COMPLETED) {
        __sync_fetch_and_add(&task->transferred_bytes, slice->length);
        __sync_fetch_and_add(&task->success_slices, 1);
    } else {
        __sync_bool_compare_and_swap(&task->first_error, PENDING, status);
    }

    // 判断整个任务是否完成
    int resolved = __sync_add_and_fetch(&task->resolved_slices, 1);
    if (resolved >= task->num_slices) {
        // 所有 slice 都处理完毕 → 设置 task->status_word
    }
    task->deref();
}

六、新旧传输路径对比

特性旧路径 (Transport::Slice)新路径 (tent::RdmaSlice)
Slice 队列unordered_map + vector (无界)BoundedMPSCQueue + BoundedSliceQueue
QP 深度存储volatile int* wr_depth_list_WrDepthBlock[].value (带 cache line padding)
QP 归因方式Slice 存指向 wr_depth 的指针Slice 存 qp_index,通过 BoundedSliceQueue 追踪
CQ 配额管理全局 cq_outstanding_ 计数器每个 CQ 独立 Quota 对象
设备选择selectDevice + 一次性selectOptimalDevice + selectFallbackDevice + 重试
重试机制per-slice retry_cnt + max_retry_cnt(均存于 Slice 内)per-slice retry_count + 全局 max_retry_count(存于 params)
健康监控简单计数 RNIC 错误RailMonitor + 设备对粒度
超时处理无显式超时slice_timeout_ns + 软件超时检测
Endpoint 销毁一次性 deconstruct两阶段 beginDestroy/finishDestroy
内存分配new/delete + 对象池Slab 分配器 (RdmaSliceStorage)

七、关键设计要点

  1. 指针追踪 vs 索引追踪:老路径用指针直接指向计数器,零开销归因;新路径用索引配合 BoundedSliceQueue,牺牲一点查找开销换取更强的生命周期保证和 cache line 隔离。

  2. WrDepthBlock 的 padding:新路径的 WrDepthBlock 结构特意加了 uint64_t padding[7],确保每个 QP 的深度计数器独占一个 cache line(64 字节),避免 false sharing。

  3. 批量原子操作:老路径的 performPollCq 用 hash map 按 QP 指针对 WC 计数分组,每个 QP 只做一次原子减法。代码中注释掉的逐 slice 减法(__sync_fetch_and_sub(slice->rdma.qp_depth, 1))佐证了批量方案的重要性。

  4. 两阶段端点销毁:新路径的 beginDestroy() 先将 QP 转入 ERR 状态,让硬件自动 Flush 所有 inflight WR;finishDestroy() 等到 wr_depth 清零后才真正析构 QP,避免了 use-after-free。

  5. 无锁协调:提交线程(只增)和轮询线程(只减)通过 __sync_fetch_and_add/sub 原子操作协调,不需要互斥锁。写入的是不同 QP 的计数器,减少了竞争。


代码验证

以下是与 QP Depth 和 Slice 处理相关的主要源代码位置(基于 commit eaf724ab):

模块文件关键行说明
Slice 定义include/transport/transport.h104-165Transport::Slice 结构,qp_depth 指针字段在 L124
RdmaSlice 定义tent/include/tent/transport/rdma/slice.h72-99tent::RdmaSlice 结构
Slice 状态更新tent/include/tent/transport/rdma/slice.h101-121updateSliceStatus 函数
全局配置include/config.h34-77max_wr=256, max_cqe=4096, num_qp_per_ep=2
老路径 QP 深度提交src/transport/rdma_transport/rdma_endpoint.cpp578-661submitPostSend:分配 QP、设置 qp_depth 指针、原子增
老路径 CQ 轮询src/transport/rdma_transport/worker_pool.cpp269-349performPollCq:分组减、批量原子操作
新路径 Slice 创建tent/src/transport/rdma/rdma_transport.cpp318-413submitTransferTasks:拆分 Slice 链表
新路径 Slice 提交tent/src/transport/rdma/endpoint.cpp604-673submitSlices:reserveQuota + ibv_post_send
新路径 QP 额度管理tent/src/transport/rdma/endpoint.cpp732-752reserveQuota / cancelQuota
新路径 Slice 确认tent/src/transport/rdma/endpoint.cpp705-721acknowledge:顺序弹出 + cancelQuota
新路径 Worker 异步发送tent/src/transport/rdma/workers.cpp212-283asyncPostSend:路径解析、分组、提交
新路径 Worker 异步轮询tent/src/transport/rdma/workers.cpp286-381asyncPollCq:CQ 轮询、重试、超时
新路径路径解析tent/src/transport/rdma/workers.cpp680-701generatePostPath:选择 device、填 lkey/rkey
新路径设备选择tent/src/transport/rdma/workers.cpp550-620selectOptimalDevice:拓扑感知设备选择
Worker 线程tent/src/transport/rdma/workers.cpp399-432workerThread:主循环 (asyncPostSend + asyncPollCq)

修订说明

2026-05-26 根据源码审查修正

  1. config 字段名修正:原文将 config.h:50 的配置字段写作 block_size,实际字段名为 slice_size(值 65536 不变)。在 tent 内部该值通过 params_->workers.block_size 使用。
  2. submitPostSend 行号范围修正:3.3 节原文称 rdma_endpoint.cpp:597-661,实际函数从第 578 行开始,已修正为 578-661
  3. Slice 结构体补充:2.1 节的 Slice 代码片段原先省略了 statustaskdest_rkeysfrom_cachelkey_indexrkey_index 字段。现已补全所有 RDMA 相关字段并标注省略了 union 中的其他传输协议分支。
  4. 重试机制对比修正:第 6 节对比表原文”per-slice retry_cnt vs per-slice retry_count + max_retry_count”暗示旧路径缺少 max_retry_cnt。实际上旧路径同样包含该字段(transport.h:126),两者的真正差异在于存储位置:旧路径的 max_retry_cntretry_cnt 并存于 Slice 结构体内(per-slice),而新路径的 max_retry_count 是全局配置参数(params_->workers.max_retry_count)。
  5. 超时处理细节补充:第 5 节第 5 阶段原文仅写”标记 TIMEOUT”,实际代码中在 acknowledge(slice, TIMEOUT) 之后还会调用 disableEndpoint(slice),进而触发 ep->resetConnection()(workers.cpp:296-312),用于即时隔离故障设备对。已补充该细节。

Share this post on:

Previous Post
Mooncake TE 阅读手记-11-TE 接口设计
Next Post
Mooncake TE 阅读手记-09-错误处理策略