团团虾导读:错误处理是分布式系统中容易被忽视但实际最考验设计功力的部分。Mooncake TE 采用的是 fail fast at submission, poll-and-decide at completion 策略——提交时立即返回可预见的错误,传输中的失败由 transport 层自动重试,最终由调用方通过轮询自行决定如何处理。这篇从错误码体系讲起,逐层拆解到调用方的决策空间。
阅读版本: Mooncake v0.3.10.post2-104-geaf724ab (commit eaf724ab, 2026-05-20)
Mooncake Transfer Engine 错误处理策略深度解析
一、整体架构概览
Mooncake Transfer Engine(以下简称 TE)的错误处理采用 分层异步模型,核心设计哲学是 “fail fast at submission, poll-and-decide at completion”:
- 提交时(submit):立即对可预见的错误返回错误码(参数无效、地址未注册、batch 超容等)
- 传输中(in-flight):由各 transport 层在 slice 级别自动重试,重试耗尽后标记失败
- 完成时(poll):由调用方轮询
getTransferStatus()检查状态,自行决定重试策略
整个错误处理链路从底层到顶层分为四个层级:错误码体系 -> 传输状态机 -> Slice/Task/Batch 分层 -> 调用方决策。
二、错误码体系:两层互补设计
2.1 C 语言宏(error.h)
用于返回 int 类型错误码的接口函数(如 init, registerLocalMemory, openSegment 等):
| 宏定义 | 数值 | 含义 |
|---|---|---|
ERR_INVALID_ARGUMENT | -1 | 无效参数 |
ERR_TOO_MANY_REQUESTS | -2 | 请求过多(超出 batch 容量) |
ERR_ADDRESS_NOT_REGISTERED | -3 | 地址未注册(RDMA 内存未注册) |
ERR_BATCH_BUSY | -4 | batch 仍有未完成任务 |
ERR_DEVICE_NOT_FOUND | -6 | 设备未找到 |
ERR_ADDRESS_OVERLAPPED | -7 | 地址重叠 |
ERR_DNS | -101 | DNS 解析错误 |
ERR_SOCKET | -102 | Socket 错误 |
ERR_MALFORMED_JSON | -103 | JSON 格式错误 |
ERR_REJECT_HANDSHAKE | -104 | 握手被拒绝 |
ERR_METADATA | -200 | 元数据错误 |
ERR_ENDPOINT | -201 | Endpoint 错误 |
ERR_CONTEXT | -202 | Context 错误 |
ERR_NUMA | -300 | NUMA 相关错误 |
ERR_CLOCK | -301 | 时钟错误 |
ERR_MEMORY | -302 | 内存分配错误 |
ERR_NOT_IMPLEMENTED | -303 | 功能未实现 |
错误码按前缀分组:-1xx 为网络/通信层,-2xx 为元数据层,-3xx 为系统层。
2.2 C++ Status 类(status.h)
借鉴 RocksDB 设计,用于返回 Status 类型的关键接口(submitTransfer, getTransferStatus, freeBatchID 等)。核心结构:
// include/common/base/status.h
class Status final {
public:
enum class Code : uint16_t {
kOk = 0,
kInvalidArgument = 1,
kTooManyRequests = 2,
kAddressNotRegistered = 3,
kBatchBusy = 4,
kDeviceNotFound = 6,
kAddressOverlapped = 7,
kNotSupportedTransport = 8,
// ... 其他码值与 error.h 错码一一对应
kNotImplemented = 999,
};
Code code() const; // 返回错误码
std::string_view message() const; // 返回错误消息
bool ok() const; // 检查是否成功
static Status OK(); // 工厂方法:创建 OK 状态
static Status InvalidArgument(std::string_view msg); // 等工厂方法...
};
大部分 Status::Code 值通过取绝对值与 error.h 宏对应(如 kInvalidArgument=1 ↔ ERR_INVALID_ARGUMENT=-1)。存在两处例外:kNotImplemented=999 与 ERR_NOT_IMPLEMENTED=-303 数值不同;kNotSupportedTransport=8 在 error.h 中没有对应的 C 宏。message 字段采用 const char* 而非 std::string,以优化小型错误消息的 SSO 开销(8 字节 vs 24-32 字节)。
三、传输状态机
每个 transfer 经历明确的状态流转,由 TransferStatusEnum 定义:
enum TransferStatusEnum {
WAITING, // 初始状态:已提交但未开始处理
PENDING, // 进行中:部分 slice 已发出
INVALID, // 无效请求
CANCELED, // 已取消
COMPLETED, // 成功完成(所有 slice 成功)
TIMEOUT, // 超时(超过 slice_timeout 仍未完成)
FAILED // 失败(至少一个 slice 失败)
};
典型状态流转路径:
成功路径: WAITING -> PENDING -> COMPLETED
失败路径: WAITING -> PENDING -> FAILED
超时路径: WAITING -> PENDING -> TIMEOUT
四、四层错误处理架构
4.1 Slice 层(最底层)
每次传输请求(TransferRequest)按 slice_size(默认 64KB)切分为多个 Slice。每个 Slice 是错误处理的最小粒度。
// include/transport/transport.h 行 104-240
struct Slice {
enum SliceStatus { PENDING, POSTED, SUCCESS, TIMEOUT, FAILED };
void markSuccess() {
status = Slice::SUCCESS;
__atomic_fetch_add(&task->transferred_bytes, length, __ATOMIC_RELAXED);
__atomic_fetch_add(&task->success_slice_count, 1, __ATOMIC_RELAXED);
}
void markFailed() {
status = Slice::FAILED;
__atomic_fetch_add(&task->failed_slice_count, 1, __ATOMIC_RELAXED);
}
volatile int64_t ts; // 时间戳(用于超时检测)
// RDMA 特有字段
struct {
uint32_t retry_cnt; // 当前重试次数
uint32_t max_retry_cnt; // 最大重试次数(从全局配置读取)
} rdma;
// ... 其他 transport 特定字段
};
关键设计点:
markSuccess/markFailed使用__atomic_fetch_add原子操作,无需加锁- Slice 对象通过
ThreadLocalSliceCache复用(容量 4096 的环形缓冲区),减少内存分配 - RDMA slice 在
rdma.retry_cnt和rdma.max_retry_cnt之间跟踪重试状态
4.2 Task 层
一个 Task 对应一个 TransferRequest(一个传输请求可能被拆成多个 slice)。
// include/transport/transport.h 行 283-320
struct TransferTask {
volatile uint64_t slice_count = 0; // 总 slice 数
volatile uint64_t success_slice_count = 0; // 成功 slice 数
volatile uint64_t failed_slice_count = 0; // 失败 slice 数
volatile uint64_t transferred_bytes = 0; // 已传输字节数
volatile bool is_finished = false; // 任务是否已结束
Transport *transport_ = nullptr; // 处理此任务的 transport
#ifdef USE_EVENT_DRIVEN_COMPLETION
volatile uint64_t completed_slice_count = 0; // 已完成的 slice 数(事件驱动模式)
#endif
};
完成判断逻辑在 multi_transport.cpp:getTransferStatus()(行 191-257):
所有 slice 都成功 => COMPLETED
任意 slice 失败 => FAILED
还有 slice 未完成 => WAITING
有 slice 超时 => TIMEOUT
4.3 Batch 层
一个 Batch 包含多个 Task,通过 BatchDesc 管理:
// include/transport/transport.h 行 322-343
struct BatchDesc {
BatchID id;
size_t batch_size;
std::vector<TransferTask> task_list;
std::atomic<bool> has_failure{false}; // 是否有失败
std::atomic<bool> is_finished{false}; // 是否全部完成
std::atomic<uint64_t> finished_transfer_bytes{0};
};
MultiTransport::getBatchTransferStatus() 的聚合逻辑(multi_transport.cpp 行 259-303):
- 遍历所有 task 的
getTransferStatus - 任意 task 为
FAILED=> 整个 batch 为FAILED - 所有 task 为
COMPLETED=> 整个 batch 为COMPLETED - 其他情况 =>
WAITING
4.4 Transport 层
TE 支持 15 种传输协议(RDMA/TCP/CXL/NVLink/NVMeOF/EFA/HIP/MACA/UB/…)每种独立实现错误处理。这里详细分析两个核心 transport:
五、RDMA Transport 的 slice 级重试策略
RDMA transport 是最复杂的 transport,其错误处理体现在 RdmaTransport::submitTransferTask() 中(rdma_transport.cpp 行 465-582)。
5.1 四层重试
第一层:设备选择重试(selectDevice)
// rdma_transport.cpp 行 691-728
int RdmaTransport::selectDevice(SegmentDesc *desc, uint64_t offset,
size_t length, std::string_view hint,
int &buffer_id, int &device_id,
int retry_count) {
if (desc == nullptr) return ERR_ADDRESS_NOT_REGISTERED;
const auto &buffers = desc->buffers;
for (buffer_id = 0; buffer_id < static_cast<int>(buffers.size()); ++buffer_id) {
const auto &buffer = buffers[buffer_id];
// 检查 offset 是否在 buffer 范围内
if (offset < buffer.addr || length > buffer.length ||
offset - buffer.addr > buffer.length - length) continue;
// 尝试按 location 选择设备,失败则回退到 wildcard
device_id = hint.empty()
? desc->topology.selectDevice(location, retry_count)
: desc->topology.selectDevice(location, hint, retry_count);
if (device_id >= 0) return 0;
// 使用 wildcard location 再试一次
device_id = hint.empty()
? desc->topology.selectDevice(kWildcardLocation, retry_count)
: desc->topology.selectDevice(kWildcardLocation, hint, retry_count);
if (device_id >= 0) return 0;
}
return ERR_ADDRESS_NOT_REGISTERED;
}
第二层:Slice 级别重试(主循环)
// rdma_transport.cpp 行 523-551
while (retry_cnt < kMaxRetryCount && !found_device) {
if (selectDevice(local_segment_desc.get(),
(uint64_t)slice->source_addr, slice->length,
buffer_id, device_id, retry_cnt++))
continue;
// ... 检查 device 是否 active
found_device = true;
break;
}
if (!found_device) {
// 清理已分配 slice,返回 AddressNotRegistered
return Status::AddressNotRegistered(
"Memory region not registered by any active device(s)");
}
关键参数:
slice->rdma.retry_cnt从request.advise_retry_cnt开始(默认为 0)slice->rdma.max_retry_cnt=globalConfig().retry_cnt(默认 9)- 总共最多 9 次重试机会
第三层:Worker Pool 后提交阶段重试
Worker Pool 在 post-send 和 CQ 轮询阶段均有重试逻辑,与提交阶段共用同一 slice->rdma.retry_cnt / max_retry_cnt 计数器:
- performPostSend(worker_pool.cpp 行 263-266):当 endpoint 为 null、已下线或连接建立失败时,递增 slice 的
rdma.retry_cnt并调用redispatch()重新调度到可能不同的 endpoint - performPollCq(worker_pool.cpp 行 325-333):处理 CQ 非 Flush 错误时,递增
retry_cnt,若未超过max_retry_cnt则将 slice 重新入队等待重试 - redispatch(worker_pool.cpp 行 352-389):统一的重调度入口,检查
retry_cnt >= max_retry_cnt,达到上限则markFailed(),否则重新选择设备和目标路径后入队
注意:CQ Flush 错误(IBV_WC_WR_FLUSH_ERR)是特殊处理——直接 markFailed() 不重试,因为这类错误发生在 QP 转换到 ERR 状态时(如 endpoint 销毁),并非真正的网络错误。
第四层:submitPostSend 后异步等待 CQ 完成
RDMA 的 submitPostSend 将 slice 提交到队列对(QP),后续通过轮询 Completion Queue 异步完成。Slice 的成功/失败回调在执行完成后触发 markSuccess/markFailed。
5.2 重试配置总览
| 配置项 | 默认值 | 说明 |
|---|---|---|
retry_cnt | 9 | 全局最大重试次数 |
slice_size | 65536 (64KB) | slice 切分粒度 |
slice_timeout | -1(禁用) | slice 超时秒数 |
fragment_limit | 16384 (16KB) | 最后一个 slice 可与前一个合并的阈值 |
六、超时检测机制
超时检测在 MultiTransport::getTransferStatus() 中实现(multi_transport.cpp 行 202-216),由调用方轮询驱动而非主动中断:
auto checkSliceTimeout = [&](const Transport::TransferTask& t) -> bool {
if (globalConfig().slice_timeout <= 0) return false; // 默认禁用
auto current_ts = getCurrentTimeInNano();
const int64_t kPacketDeliveryTimeout =
globalConfig().slice_timeout * 1000000000;
for (auto& slice : t.slice_list) {
auto ts = slice->ts;
if (ts > 0 && current_ts > ts &&
current_ts - ts > kPacketDeliveryTimeout) {
LOG(INFO) << "Slice timeout detected";
return true;
}
}
return false;
};
关键特征:
slice_timeout默认为 -1(超时检测禁用)- 检测发生在
getTransferStatus()调用时,不会主动中止 in-flight slice - 轮询到超时后将状态标记为
TIMEOUT,调用方可以决定重试还是放弃
七、对端存活检测(Peer Liveness)
TE 提供对端存活探测能力,但不自动重连:
// transfer_engine_impl.cpp 行 445-451
int TransferEngineImpl::probePeerAliveByID(SegmentID target_id) {
auto desc = metadata_->getSegmentDescByID(target_id);
if (!desc) {
return ERR_METADATA;
}
return metadata_->sendProbe(desc->name);
}
通过 metadata server 向目标发送探测消息。API 层面映射为 PeerLiveness 枚举:
// transfer_engine.h 行 37-40
enum class PeerLiveness : uint8_t {
Alive = 0,
Unreachable = 1,
};
调用方负责根据 liveness 结果决定后续处理策略。
八、TCP Transport 的错误处理
TCP transport 使用异常捕获 + 连接池清理的模式(tcp_transport.cpp 行 875-967):
void TcpTransport::startTransfer(Slice* slice) {
auto desc = metadata_->getSegmentDescByID(slice->target_id);
if (!desc) {
slice->markFailed(); // 预检查失败直接标记
return;
}
// ... RPC meta 查询、连接获取 ...
try {
auto session = std::make_shared<ClientSession>(socket);
// 注册异步完成回调
session->on_finalize_ = [slice](TransferStatusEnum status) {
if (status == TransferStatusEnum::COMPLETED)
slice->markSuccess();
else
slice->markFailed();
};
session->initiate(slice->source_addr, slice->tcp.dest_addr,
slice->length, slice->opcode);
} catch (std::exception& e) {
LOG(ERROR) << "Transfer exception: " << e.what();
// 关闭异常连接
socket->close(ec);
// 从连接池中移除异常连接
if (enable_connection_pool_) {
ConnectionKey key{meta_entry.ip_or_host_name, desc->tcp_data_port};
// ... 在连接池中查找并移除异常连接 ...
}
slice->markFailed();
}
}
TCP 的错误处理特点:
- 预检查阶段(无效 segment ID、无 RPC meta、无法获取连接)直接
markFailed并返回 - 传输阶段异常通过
try/catch捕获 - 异常连接不会返回连接池,而是直接关闭并从池中移除
- 连接池启用异常清理逻辑,确保不一致的 socket 不会污染后续请求
九、调用方职责:关键设计哲学
TE 不做自动的 batch 级别重试。调用方的标准使用模式如下:
// 示例: transfer_engine_bench_with_retry.cpp 行 205-227
auto batch_id = engine->allocateBatchID(batch_size);
auto s = engine->submitTransfer(batch_id, requests); // 1. 提交
if (!s.ok()) {
LOG(INFO) << "Found Failed Requests"; // 2. 提交失败
} else {
for (int task_id = 0; task_id < batch_size; ++task_id) {
bool completed = false;
TransferStatus status;
while (!completed) {
Status s = engine->getTransferStatus(batch_id, task_id, status); // 3. 轮询
LOG_ASSERT(s.ok());
if (status.s == TransferStatusEnum::COMPLETED)
completed = true; // 成功
else if (status.s == TransferStatusEnum::FAILED) {
LOG(INFO) << "Found Failed Requests"; // 失败,调用方可重试
completed = true;
}
// TIMEOUT 状态也需要调用方自行处理
}
}
s = engine->freeBatchID(batch_id); // 4. 释放 batch
}
调用方决策矩阵:
| 状态 | 含义 | 调用方可选操作 |
|---|---|---|
COMPLETED | 传输成功 | 释放 batch,继续下一轮 |
FAILED | 所有 slice 重试耗尽 | 释放 batch 或重新提交 |
TIMEOUT | 超过 timeout 时间 | 释放 batch 或重试(建议先 probePeerAlive) |
WAITING | 仍在进行中 | 继续轮询 |
INVALID | 请求无效 | 检查参数后重新构造请求 |
十、代码验证
以下是对上述关键路径的源码级验证:
10.1 错误码定义
文件:mooncake-transfer-engine/include/error.h(行 18-38)
- 共 17 个错误码宏,按语义分组(网络/元数据/系统),数值跨度为 -1 到 -303
文件:mooncake-transfer-engine/include/common/base/status.h(行 33-54)
Status::Code枚举值精确对应error.h中每个宏的值(kInvalidArgument=1对应ERR_INVALID_ARGUMENT=-1)- 注释明确说明设计借鉴 RocksDB(行 15-17)
10.2 Slice 原子计数器
文件:mooncake-transfer-engine/include/transport/transport.h(行 168-182)
markSuccess使用__atomic_fetch_add原子递增success_slice_count和transferred_bytesmarkFailed同样使用__atomic_fetch_add原子递增failed_slice_count- 无锁设计(
__ATOMIC_RELAXED),高性能路径
10.3 RDMA 重试逻辑
文件:mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp
- 行 471:
const int kMaxRetryCount = globalConfig().retry_cnt;(从配置读取重试上限) - 行 507-508:
slice->rdma.retry_cnt = request.advise_retry_cnt; slice->rdma.max_retry_cnt = kMaxRetryCount; - 行 523-540:
while (retry_cnt < kMaxRetryCount && !found_device)循环重试设备选择 - 行 548-549:重试耗尽后返回
Status::AddressNotRegistered - 行 691-728:
selectDevice实现内核逻辑,包含 location 匹配和 wildcard 回退
10.4 超时检测
文件:mooncake-transfer-engine/include/config.h(行 56)
int64_t slice_timeout = -1; // 默认禁用
文件:mooncake-transfer-engine/src/multi_transport.cpp(行 202-216)
if (globalConfig().slice_timeout <= 0) return false; // 最早短路
10.5 TCP transport 连接池异常清理
文件:mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp(行 933-966)
- 异常发生时关闭 socket、遍历连接池查找并移除异常连接
- 注释明确说明不归还到连接池:“Don’t return it to the pool as it may be in an inconsistent state”
10.6 Batch 级别聚合
文件:mooncake-transfer-engine/src/multi_transport.cpp(行 259-303)
getBatchTransferStatus遍历所有 task- 任意 task
FAILED=> batchFAILED - 提前短路返回(行 287-288)
10.7 Peer 存活探测
文件:mooncake-transfer-engine/src/transfer_engine_impl.cpp(行 445-451)
- 通过
metadata_->sendProbe()探测,返回int类型错误码 - 不自动重连
10.8 调用方重试示例
文件:mooncake-transfer-engine/example/transfer_engine_bench_with_retry.cpp(行 205-227)
- 展示了完整的 submitTransfer -> getTransferStatus 轮询 -> freeBatchID 流程
- 对 FAILED 状态只记录日志,未做自动重试,体现”调用方决策”的设计哲学
十一、总结
Mooncake Transfer Engine 的错误处理策略可以概括为:
- 分层解耦:Slice 级别自动重试,Task/Batch 级别状态聚合,Transport 层独立实现,调用方最终决策
- 异步轮询:传输提交几乎不阻塞,完成状态通过轮询获取,不引入主动中断或回调
- 原子无锁:Slice 的成功/失败计数使用
__atomic_fetch_add,避免锁竞争 - 默认保守:slice_timeout 默认禁用(-1),需要调用方自行启用
- 不做自动恢复:不自动重试 batch,不自动重连断开的对端,一切由上层控制
- 内存复用:Slice 对象通过 ThreadLocalSliceCache(容量 4096)环形缓冲区复用,减少分配开销
修订说明
本文基于审阅意见(04-review-notes.md)对以下内容进行了修订:
- Section 10.1:错误码宏数量从”16”更正为”17”(与 Section 2.1 表格及
error.h源码一致) - Section 2.2:修正”Status::Code 与 error.h 完全对应”的表述,标注两处例外:
kNotImplemented=999与ERR_NOT_IMPLEMENTED=-303数值不同,kNotSupportedTransport=8在 error.h 中无对应 C 宏 - Section 4.4:传输协议数量从”20+ 种”更正为”15 种”(与
installTransport()源码一致) - Section 5.1:重试层数从”三层”扩展为”四层”,补充 Worker Pool 后提交阶段重调度(
performPostSend/performPollCq→redispatch) - Section 9:补充示例代码中省略的
Status s =返回值捕获和LOG_ASSERT(s.ok())断言 - Section 10.3:修正三处行号引用(主循环结束行 551→540,selectDevice 结束行 727→728)