团团虾导读:LLM 推理的 KV Cache 传输动辄几百 MB,如果一次性提交会造成 QP 溢出。Mooncake 的解法是把大请求按 64KB 切片,配合两级反压(QP Depth + CQ 水位)控制提交节奏。这篇从 Slice 结构体讲起,拆解了旧路径和 TENT 新路径两套调度机制。
阅读版本: Mooncake v0.3.10.post2-104-geaf724ab (commit eaf724ab, 2026-05-20)
Mooncake QP Depth 与 RDMA Slice 处理机制
一、为什么需要 QP Depth
RDMA 传输的基本单位是 Work Request (WR)。上层提交 WR 到 QP (Queue Pair) 的 Send Queue,硬件异步取出并执行,完成后往 CQ (Completion Queue) 写入 WC (Work Completion)。如果提交速度超过网卡消化速度,WR 会在 QP 里堆积直至溢出 — 这就是 QP Depth 要解决的问题。
Mooncake 需要在高吞吐的 KV Cache 传输场景下保证:
- 尽量打满 RDMA 带宽(足够多的 inflight WR)
- 防止 QP overflow 导致 ibv_post_send 失败
- 防止 CQ overflow 导致 Completion 丢失
- 在多 QP / 多线程环境下无锁协调
为此,Mooncake 设计了一套两级反压 + 指针追踪的 QP Depth 机制。
二、Slice - 传输的基本调度单元
在大模型 KV Cache 传输场景中,一个 TransferRequest 往往携带几百 KB 到几 MB 的数据。Mooncake 会把一个大块请求按 slice_size(默认 65536 字节,见 config.h:50,在 tent 内部通过 params_->workers.block_size 使用)切分成多个 Slice,每个 Slice 对应一个 RDMA WR。
2.1 旧传输路径(Transport::Slice)
文件:mooncake-transfer-engine/include/transport/transport.h
struct Slice {
void *source_addr; // 本地数据地址
size_t length; // 本次传输字节数
TransferRequest::OpCode opcode; // READ or WRITE
SegmentID target_id; // 远端 Segment ID
std::string peer_nic_path; // 远端 NIC 路径
SliceStatus status; // 当前状态 (PENDING/POSTED/SUCCESS/TIMEOUT/FAILED)
TransferTask *task; // 所属传输任务
std::vector<uint32_t> dest_rkeys; // 远端 rkey 列表
bool from_cache; // 是否来自 Slice 缓存池
union {
struct {
uint64_t dest_addr; // 远端目标地址 = target_offset + offset
uint32_t source_lkey; // 本地内存 lkey
uint32_t dest_rkey; // 远端内存 rkey
int lkey_index; // lkey 在 MR 列表中的索引
int rkey_index; // rkey 在 MR 列表中的索引
volatile int *qp_depth; // ★ 指向 wr_depth_list_[qp_index] 的指针
uint32_t retry_cnt; // 当前重试次数
uint32_t max_retry_cnt; // 最大重试次数
} rdma;
// ... 其他传输协议
};
};
注意:以上展示了 RDMA 传输相关的所有字段,union 中省略了 TCP、NVMe-oF、CXL 等其他传输协议分支结构。完整定义见
transport.h:104-165。
关键字段解析:
- source_addr:本地内存起始地址,直接指向已注册的 MR 区域
- dest_addr:远端目标地址,由请求中的
target_offset + offset计算得出 - source_lkey / dest_rkey:本地和远端 MR 的 access key,用于 RDMA 操作权限验证
- qp_depth:指向对应 QP 的 wr_depth_list_ 计数器的指针,这是 QP Depth 机制的核心
- retry_cnt:失败重试计数,每次失败后递增
2.2 新传输路径(tent::RdmaSlice)
文件:mooncake-transfer-engine/tent/include/tent/transport/rdma/slice.h
struct RdmaSlice {
void* source_addr = nullptr; // 本地数据地址
uint64_t target_addr = 0; // 远端目标地址
size_t length = 0; // 传输长度
RdmaTask* task = nullptr; // 所属任务
RdmaSlice* next = nullptr; // 链表指针(切片链)
uint32_t source_lkey = 0; // 本地 lkey
uint32_t target_rkey = 0; // 远端 rkey
int source_dev_id = -1; // 本地 device ID
int target_dev_id = -1; // 远端 device ID
std::weak_ptr<RdmaEndPoint> ep_weak_ptr; // 所属 endpoint 弱引用
TransferStatusEnum word = INITIAL; // 完成状态
int qp_index = 0; // 绑定的 QP 索引
int retry_count = 0; // 重试计数
bool failed = false; // 提交失败标记
uint64_t enqueue_ts = 0; // 入队时间戳
uint64_t submit_ts = 0; // 提交时间戳
RailMonitor* rail_monitor = nullptr; // 设备对健康监控器
};
新传输路径的 Slice 在前者基础上做了几项改进:
- 链表结构:通过
next指针可以组成 Slice 链表,便于批量提交 - 设备亲和性:记录
source_dev_id和target_dev_id,支持双端设备选择 - 时间追踪:
enqueue_ts和submit_ts用于超时检测和性能统计 - 设备健康监控:
rail_monitor指针直接指向 RailMonitor,避免 CQ 热路径上的 map 查找
2.3 Slice 的创建流程
在 tent 传输中(rdma_transport.cpp:318-413),Slice 的创建遵循以下过程:
1. 对每个 TransferRequest
├── 计算切片数: num_slices = min(max(1, length/block_size), max_slice_count)
├── 尾块合并: 若尾部 < block_size * 0.25,合并到最后一块
├── 实际块大小: block_size = roundup(length/num_slices, default_block_size)
└── 循环创建每个 Slice:
├── source_addr = request.source + offset
├── target_addr = request.target_offset + offset
├── length = min(remaining, block_size)
├── task 指针关联
├── retry_count = 0
└── 插入到 worker 对应的 Slice 链表中
2. 将所有 Slice 链表提交到 Workers
三、QP Depth 的两级反压体系
3.1 第一级 — QP 级别
每个 QP 有一个 wr_depth_list_[qp_index] 计数器,记录该 QP 上当前有多少个已提交但未完成的 WR。
- 最大深度:
max_wr_depth_,默认 256(config.h:45) - 每个 endpoint(一个 NIC 对)默认有
num_qp_per_ep = 2个 QP(config.h:43)
提交前检查:
int qp_avail = max_wr_depth_ - wr_depth_list_[qp_index];
if (qp_avail <= 0) continue; // QP 已满,跳过
这一级防止单个 QP 被过度提交导致 ibv_post_send 失败。
3.2 第二级 — CQ 级别
每个 CQ 有一个 cq_outstanding_ 计数器,记录该 CQ 上所有 QP 的未完成 WR 总数。
- 最大容量:
max_cqe = 4096(config.h:41) - 每个 context 默认有
num_cq_per_ctx = 1个 CQ
提交前检查:
int cq_remaining = max_cqe - *cq_outstanding_;
// ...
wr_count = min(wr_count, cq_remaining);
这一级防止 CQ 溢出导致 WC 丢失。这是硬件硬限制 — ibv_create_cq 时创建的 CQE 数量是固定的。
3.3 提交时的分配策略
在旧路径 submitPostSend(rdma_endpoint.cpp:578-661)中,Slice 被按比例分配到各个 QP:
// 将剩余 slices 均匀分配到剩余的 QP 上
size_t chunk = (remaining_slices + remaining_qps - 1) / remaining_qps;
int wr_count = min(assigned_count, qp_avail); // 再受 QP 可用度约束
wr_count = min(wr_count, cq_remaining); // 再受 CQ 可用度约束
// 提交
__sync_fetch_and_add(&wr_depth_list_[qp_index], wr_count);
__sync_fetch_and_add(cq_outstanding_, wr_count);
ibv_post_send(qp_list_[qp_index], wr_list.data(), &bad_wr);
新路径 submitSlices(tent endpoint.cpp:604-673)更简洁:
int wr_count = min(
cq->maxCqe() - cq->getQuota(), // CQ 剩余空间
min(params_->max_qp_wr - wr_depth_list_[qp_index].value, // QP 剩余空间
(int)slice_list.size()) // 待提交的 slice 数量
);
reserveQuota(qp_index, wr_count); // 原子增加 wr_depth 和 CQ quota
ibv_post_send(qp_list_[qp_index], wr_list.data(), &bad_wr);
新路径的 reserveQuota(endpoint.cpp:732-743)同时维护 QP 深度和 CQ 配额:
bool RdmaEndPoint::reserveQuota(int qp_index, int num_entries) {
auto cq = context_->cq(qp_index % context_->cqCount());
if (!cq->reserveQuota(num_entries)) return false; // CQ 配额不足
auto prev = __sync_fetch_and_add(&wr_depth_list_[qp_index].value, num_entries);
__sync_fetch_and_add(&inflight_slices_, num_entries);
if (prev + num_entries > params_->max_qp_wr) {
cancelQuota(qp_index, num_entries); // 超额了,回退
return false;
}
return true;
}
四、“回邮地址” — QP Depth 指针追踪
这是 Mooncake 最精妙的设计之一。
4.1 提交侧:在 Slice 里埋下指针
// 老路径
slice->rdma.qp_depth = &wr_depth_list_[qp_index];
// 新路径(隐式)
// slice->qp_index 记录 QP 编号,acknowledge 时通过 qp_index 找到 wr_depth_list_
4.2 完成侧:通过 Slice 指针找到对应计数器
RDMA 硬件完成传输后,往 CQ 写一条 WC,其中 wr_id 就是提交时填入的 Slice* 指针。轮询线程拿到 Slice 后:
老路径(worker_pool.cpp:269-349)采用分组批量减:
std::unordered_map<volatile int*, int> qp_depth_set;
for each completion WC:
Slice* slice = (Slice*)wc[i].wr_id;
qp_depth_set[slice->rdma.qp_depth]++; // 按 QP 指针分组计数
// 每个 QP 只做一次原子减法(而非每个 slice 做一次)
for (auto& entry : qp_depth_set)
__sync_fetch_and_sub(entry.first, entry.second);
源码中第 289 行有一个被注释掉的替代方案:
// __sync_fetch_and_sub(slice->rdma.qp_depth, 1);
这说明作者做过权衡 — 逐片减法会产生大量原子竞争,而分组后每个 QP 只做一次原子操作,在高吞吐场景下性能差异显著。
新路径(tent workers.cpp:286-381)在 acknowledge 时处理:
// endpoint::acknowledge()
size_t RdmaEndPoint::acknowledge(RdmaSlice* slice, TransferStatusEnum status) {
auto qp_index = slice->qp_index;
auto& queue = slice_queue_[qp_index];
// 从 BoundedSliceQueue 中按序弹出,直到找到目标 slice
do {
current = queue.pop();
updateSliceStatus(current, status);
} while (current != slice);
cancelQuota(qp_index, num_entries); // 原子减法
}
这里 cancelQuota 同时减少 wr_depth_list_ 和 CQ quota:
void RdmaEndPoint::cancelQuota(int qp_index, int num_entries) {
__sync_fetch_and_sub(&wr_depth_list_[qp_index].value, num_entries);
__sync_fetch_and_sub(&inflight_slices_, num_entries);
cq->cancelQuota(num_entries);
}
4.3 可视化:指针归因流程
提交线程 硬件 RDMA 轮询线程
获取 QP#2 的可用深度 | |
qp_avail = 256 - wr_depth_list_[2] | |
= 200(可提交 5 个) | |
| |
slice_list[0].rdma.qp_depth = | |
&wr_depth_list_[2]; ← 埋指针 | |
| |
__sync_fetch_and_add( | |
&wr_depth_list_[2], 5) | |
→ wr_depth_list_[2] = 205 | |
| |
ibv_post_send(QP#2, wr_list, ...) ──────────→ 网卡开始异步执行 |
| |
| ... RDMA 传输中 ... |
| |
←── ←── WC 写入 CQ poll CQ
| (wr_id = Slice*) ↓
| Slice* s = (Slice*)wc.wr_id;
| s->rdma.qp_depth
| ↓
| 确定是 wr_depth_list_[2]
| qp_depth_set[ptr]++
| |
| __sync_fetch_and_sub(
| &wr_depth_list_[2], 5)
| → wr_depth_list_[2] = 200
五、Slice 的完整生命周期
以 tent 新传输路径为例,一个 Slice 从创建到完成经历以下阶段:
阶段 1:分配与拆分 (submitTransferTasks)
在 rdma_transport.cpp:374-404:
- 根据 block_size 将 TransferRequest 拆分成 RdmaSlice 链表
- 设置 source_addr、target_addr、length、task、retry_count
- 按 worker 分片(spray 模式 / 轮转模式)
阶段 2:路径解析 (generatePostPath)
在 workers.cpp:680-701:
- 根据 source_addr 找到本地 Buffer → lkey → 本地 device_id
- 根据 target_id + target_addr 找到远端 Buffer → rkey → 远端 device_id
- 首次提交:调用
selectOptimalDevice()基于拓扑和 RailMonitor 选择最优设备对 - 重试提交:调用
selectFallbackDevice()遍历备选设备对 - 缓存 RailMonitor 指针到 slice->rail_monitor(避免 CQ 热路径 map 查找)
阶段 3:分组与端点获取 (asyncPostSend)
在 workers.cpp:212-283:
- 按 PostPath(local_device_id, remote_segment_id, remote_device_id) 分组
- 获取或创建 RdmaEndPoint(需要时触发 RPC 握手建连)
- 调用
endpoint->submitSlices(slices, worker_id)
阶段 4:提交到硬件 (submitSlices)
在 endpoint.cpp:604-673:
- 计算可提交数量 = min(CQ 剩余, QP 剩余, 待提交数)
- 构造 ibv_send_wr 和 ibv_sge
- 调用
reserveQuota原子增加 wr_depth 和 CQ quota ibv_post_send()提交到硬件- 将成功的 slice 插入 BoundedSliceQueue(用于完成时的顺序确认)
阶段 5:轮询完成 (asyncPollCq)
在 workers.cpp:286-381:
- 批量 poll CQ(每次 64 条 WC)
- 对每条 WC:
- 成功:acknowledge(slice, COMPLETED),释放 DeviceQuota
- 失败(非 Flush):retry_count++,若未超限则 resubmit
- Flush 错误:直接失败(endpoint 正在销毁中)
- 超时检测:遍历 inflight_slice_set,超过
slice_timeout_ns的调用ep->acknowledge(slice, TIMEOUT)标记超时,同时调用disableEndpoint(slice)触发resetConnection(workers.cpp:296-312)
阶段 6:状态更新 (updateSliceStatus)
在 slice.h:101-121:
static inline void updateSliceStatus(RdmaSlice* slice, TransferStatusEnum status) {
// CAS 设置 slice 状态
if (!__sync_bool_compare_and_swap(&slice->word, PENDING, status)) return;
if (status == COMPLETED) {
__sync_fetch_and_add(&task->transferred_bytes, slice->length);
__sync_fetch_and_add(&task->success_slices, 1);
} else {
__sync_bool_compare_and_swap(&task->first_error, PENDING, status);
}
// 判断整个任务是否完成
int resolved = __sync_add_and_fetch(&task->resolved_slices, 1);
if (resolved >= task->num_slices) {
// 所有 slice 都处理完毕 → 设置 task->status_word
}
task->deref();
}
六、新旧传输路径对比
| 特性 | 旧路径 (Transport::Slice) | 新路径 (tent::RdmaSlice) |
|---|---|---|
| Slice 队列 | unordered_map + vector (无界) | BoundedMPSCQueue + BoundedSliceQueue |
| QP 深度存储 | volatile int* wr_depth_list_ | WrDepthBlock[].value (带 cache line padding) |
| QP 归因方式 | Slice 存指向 wr_depth 的指针 | Slice 存 qp_index,通过 BoundedSliceQueue 追踪 |
| CQ 配额管理 | 全局 cq_outstanding_ 计数器 | 每个 CQ 独立 Quota 对象 |
| 设备选择 | selectDevice + 一次性 | selectOptimalDevice + selectFallbackDevice + 重试 |
| 重试机制 | per-slice retry_cnt + max_retry_cnt(均存于 Slice 内) | per-slice retry_count + 全局 max_retry_count(存于 params) |
| 健康监控 | 简单计数 RNIC 错误 | RailMonitor + 设备对粒度 |
| 超时处理 | 无显式超时 | slice_timeout_ns + 软件超时检测 |
| Endpoint 销毁 | 一次性 deconstruct | 两阶段 beginDestroy/finishDestroy |
| 内存分配 | new/delete + 对象池 | Slab 分配器 (RdmaSliceStorage) |
七、关键设计要点
-
指针追踪 vs 索引追踪:老路径用指针直接指向计数器,零开销归因;新路径用索引配合 BoundedSliceQueue,牺牲一点查找开销换取更强的生命周期保证和 cache line 隔离。
-
WrDepthBlock 的 padding:新路径的
WrDepthBlock结构特意加了uint64_t padding[7],确保每个 QP 的深度计数器独占一个 cache line(64 字节),避免 false sharing。 -
批量原子操作:老路径的
performPollCq用 hash map 按 QP 指针对 WC 计数分组,每个 QP 只做一次原子减法。代码中注释掉的逐 slice 减法(__sync_fetch_and_sub(slice->rdma.qp_depth, 1))佐证了批量方案的重要性。 -
两阶段端点销毁:新路径的
beginDestroy()先将 QP 转入 ERR 状态,让硬件自动 Flush 所有 inflight WR;finishDestroy()等到 wr_depth 清零后才真正析构 QP,避免了 use-after-free。 -
无锁协调:提交线程(只增)和轮询线程(只减)通过
__sync_fetch_and_add/sub原子操作协调,不需要互斥锁。写入的是不同 QP 的计数器,减少了竞争。
代码验证
以下是与 QP Depth 和 Slice 处理相关的主要源代码位置(基于 commit eaf724ab):
| 模块 | 文件 | 关键行 | 说明 |
|---|---|---|---|
| Slice 定义 | include/transport/transport.h | 104-165 | Transport::Slice 结构,qp_depth 指针字段在 L124 |
| RdmaSlice 定义 | tent/include/tent/transport/rdma/slice.h | 72-99 | tent::RdmaSlice 结构 |
| Slice 状态更新 | tent/include/tent/transport/rdma/slice.h | 101-121 | updateSliceStatus 函数 |
| 全局配置 | include/config.h | 34-77 | max_wr=256, max_cqe=4096, num_qp_per_ep=2 |
| 老路径 QP 深度提交 | src/transport/rdma_transport/rdma_endpoint.cpp | 578-661 | submitPostSend:分配 QP、设置 qp_depth 指针、原子增 |
| 老路径 CQ 轮询 | src/transport/rdma_transport/worker_pool.cpp | 269-349 | performPollCq:分组减、批量原子操作 |
| 新路径 Slice 创建 | tent/src/transport/rdma/rdma_transport.cpp | 318-413 | submitTransferTasks:拆分 Slice 链表 |
| 新路径 Slice 提交 | tent/src/transport/rdma/endpoint.cpp | 604-673 | submitSlices:reserveQuota + ibv_post_send |
| 新路径 QP 额度管理 | tent/src/transport/rdma/endpoint.cpp | 732-752 | reserveQuota / cancelQuota |
| 新路径 Slice 确认 | tent/src/transport/rdma/endpoint.cpp | 705-721 | acknowledge:顺序弹出 + cancelQuota |
| 新路径 Worker 异步发送 | tent/src/transport/rdma/workers.cpp | 212-283 | asyncPostSend:路径解析、分组、提交 |
| 新路径 Worker 异步轮询 | tent/src/transport/rdma/workers.cpp | 286-381 | asyncPollCq:CQ 轮询、重试、超时 |
| 新路径路径解析 | tent/src/transport/rdma/workers.cpp | 680-701 | generatePostPath:选择 device、填 lkey/rkey |
| 新路径设备选择 | tent/src/transport/rdma/workers.cpp | 550-620 | selectOptimalDevice:拓扑感知设备选择 |
| Worker 线程 | tent/src/transport/rdma/workers.cpp | 399-432 | workerThread:主循环 (asyncPostSend + asyncPollCq) |
修订说明
2026-05-26 根据源码审查修正
- config 字段名修正:原文将
config.h:50的配置字段写作block_size,实际字段名为slice_size(值 65536 不变)。在 tent 内部该值通过params_->workers.block_size使用。 - submitPostSend 行号范围修正:3.3 节原文称
rdma_endpoint.cpp:597-661,实际函数从第 578 行开始,已修正为578-661。 - Slice 结构体补充:2.1 节的 Slice 代码片段原先省略了
status、task、dest_rkeys、from_cache、lkey_index、rkey_index字段。现已补全所有 RDMA 相关字段并标注省略了 union 中的其他传输协议分支。 - 重试机制对比修正:第 6 节对比表原文”per-slice retry_cnt vs per-slice retry_count + max_retry_count”暗示旧路径缺少
max_retry_cnt。实际上旧路径同样包含该字段(transport.h:126),两者的真正差异在于存储位置:旧路径的max_retry_cnt与retry_cnt并存于 Slice 结构体内(per-slice),而新路径的max_retry_count是全局配置参数(params_->workers.max_retry_count)。 - 超时处理细节补充:第 5 节第 5 阶段原文仅写”标记 TIMEOUT”,实际代码中在
acknowledge(slice, TIMEOUT)之后还会调用disableEndpoint(slice),进而触发ep->resetConnection()(workers.cpp:296-312),用于即时隔离故障设备对。已补充该细节。