Skip to content
团子云技术 Lite 1.048596
Go back

Mooncake TE 阅读手记-09-错误处理策略

团团虾导读:错误处理是分布式系统中容易被忽视但实际最考验设计功力的部分。Mooncake TE 采用的是 fail fast at submission, poll-and-decide at completion 策略——提交时立即返回可预见的错误,传输中的失败由 transport 层自动重试,最终由调用方通过轮询自行决定如何处理。这篇从错误码体系讲起,逐层拆解到调用方的决策空间。

阅读版本: Mooncake v0.3.10.post2-104-geaf724ab (commit eaf724ab, 2026-05-20)

Mooncake Transfer Engine 错误处理策略深度解析

一、整体架构概览

Mooncake Transfer Engine(以下简称 TE)的错误处理采用 分层异步模型,核心设计哲学是 “fail fast at submission, poll-and-decide at completion”

整个错误处理链路从底层到顶层分为四个层级:错误码体系 -> 传输状态机 -> Slice/Task/Batch 分层 -> 调用方决策


二、错误码体系:两层互补设计

2.1 C 语言宏(error.h

用于返回 int 类型错误码的接口函数(如 init, registerLocalMemory, openSegment 等):

宏定义数值含义
ERR_INVALID_ARGUMENT-1无效参数
ERR_TOO_MANY_REQUESTS-2请求过多(超出 batch 容量)
ERR_ADDRESS_NOT_REGISTERED-3地址未注册(RDMA 内存未注册)
ERR_BATCH_BUSY-4batch 仍有未完成任务
ERR_DEVICE_NOT_FOUND-6设备未找到
ERR_ADDRESS_OVERLAPPED-7地址重叠
ERR_DNS-101DNS 解析错误
ERR_SOCKET-102Socket 错误
ERR_MALFORMED_JSON-103JSON 格式错误
ERR_REJECT_HANDSHAKE-104握手被拒绝
ERR_METADATA-200元数据错误
ERR_ENDPOINT-201Endpoint 错误
ERR_CONTEXT-202Context 错误
ERR_NUMA-300NUMA 相关错误
ERR_CLOCK-301时钟错误
ERR_MEMORY-302内存分配错误
ERR_NOT_IMPLEMENTED-303功能未实现

错误码按前缀分组:-1xx 为网络/通信层,-2xx 为元数据层,-3xx 为系统层。

2.2 C++ Status 类(status.h

借鉴 RocksDB 设计,用于返回 Status 类型的关键接口(submitTransfer, getTransferStatus, freeBatchID 等)。核心结构:

// include/common/base/status.h
class Status final {
public:
    enum class Code : uint16_t {
        kOk = 0,
        kInvalidArgument = 1,
        kTooManyRequests = 2,
        kAddressNotRegistered = 3,
        kBatchBusy = 4,
        kDeviceNotFound = 6,
        kAddressOverlapped = 7,
        kNotSupportedTransport = 8,
        // ... 其他码值与 error.h 错码一一对应
        kNotImplemented = 999,
    };

    Code code() const;              // 返回错误码
    std::string_view message() const; // 返回错误消息
    bool ok() const;                // 检查是否成功
    static Status OK();             // 工厂方法:创建 OK 状态
    static Status InvalidArgument(std::string_view msg); // 等工厂方法...
};

大部分 Status::Code 值通过取绝对值与 error.h 宏对应(如 kInvalidArgument=1ERR_INVALID_ARGUMENT=-1)。存在两处例外:kNotImplemented=999ERR_NOT_IMPLEMENTED=-303 数值不同;kNotSupportedTransport=8error.h 中没有对应的 C 宏。message 字段采用 const char* 而非 std::string,以优化小型错误消息的 SSO 开销(8 字节 vs 24-32 字节)。


三、传输状态机

每个 transfer 经历明确的状态流转,由 TransferStatusEnum 定义:

enum TransferStatusEnum {
    WAITING,    // 初始状态:已提交但未开始处理
    PENDING,    // 进行中:部分 slice 已发出
    INVALID,    // 无效请求
    CANCELED,   // 已取消
    COMPLETED,  // 成功完成(所有 slice 成功)
    TIMEOUT,    // 超时(超过 slice_timeout 仍未完成)
    FAILED      // 失败(至少一个 slice 失败)
};

典型状态流转路径:

成功路径: WAITING -> PENDING -> COMPLETED
失败路径: WAITING -> PENDING -> FAILED
超时路径: WAITING -> PENDING -> TIMEOUT

四、四层错误处理架构

4.1 Slice 层(最底层)

每次传输请求(TransferRequest)按 slice_size(默认 64KB)切分为多个 Slice。每个 Slice 是错误处理的最小粒度。

// include/transport/transport.h 行 104-240
struct Slice {
    enum SliceStatus { PENDING, POSTED, SUCCESS, TIMEOUT, FAILED };

    void markSuccess() {
        status = Slice::SUCCESS;
        __atomic_fetch_add(&task->transferred_bytes, length, __ATOMIC_RELAXED);
        __atomic_fetch_add(&task->success_slice_count, 1, __ATOMIC_RELAXED);
    }

    void markFailed() {
        status = Slice::FAILED;
        __atomic_fetch_add(&task->failed_slice_count, 1, __ATOMIC_RELAXED);
    }

    volatile int64_t ts;    // 时间戳(用于超时检测)

    // RDMA 特有字段
    struct {
        uint32_t retry_cnt;       // 当前重试次数
        uint32_t max_retry_cnt;   // 最大重试次数(从全局配置读取)
    } rdma;
    // ... 其他 transport 特定字段
};

关键设计点:

4.2 Task 层

一个 Task 对应一个 TransferRequest(一个传输请求可能被拆成多个 slice)。

// include/transport/transport.h 行 283-320
struct TransferTask {
    volatile uint64_t slice_count = 0;         // 总 slice 数
    volatile uint64_t success_slice_count = 0; // 成功 slice 数
    volatile uint64_t failed_slice_count = 0;  // 失败 slice 数
    volatile uint64_t transferred_bytes = 0;   // 已传输字节数
    volatile bool is_finished = false;         // 任务是否已结束
    Transport *transport_ = nullptr;           // 处理此任务的 transport

#ifdef USE_EVENT_DRIVEN_COMPLETION
    volatile uint64_t completed_slice_count = 0; // 已完成的 slice 数(事件驱动模式)
#endif
};

完成判断逻辑在 multi_transport.cpp:getTransferStatus()(行 191-257):

所有 slice 都成功          => COMPLETED
任意 slice 失败            => FAILED
还有 slice 未完成          => WAITING
有 slice 超时              => TIMEOUT

4.3 Batch 层

一个 Batch 包含多个 Task,通过 BatchDesc 管理:

// include/transport/transport.h 行 322-343
struct BatchDesc {
    BatchID id;
    size_t batch_size;
    std::vector<TransferTask> task_list;

    std::atomic<bool> has_failure{false};        // 是否有失败
    std::atomic<bool> is_finished{false};        // 是否全部完成
    std::atomic<uint64_t> finished_transfer_bytes{0};
};

MultiTransport::getBatchTransferStatus() 的聚合逻辑(multi_transport.cpp 行 259-303):

4.4 Transport 层

TE 支持 15 种传输协议(RDMA/TCP/CXL/NVLink/NVMeOF/EFA/HIP/MACA/UB/…)每种独立实现错误处理。这里详细分析两个核心 transport:

五、RDMA Transport 的 slice 级重试策略

RDMA transport 是最复杂的 transport,其错误处理体现在 RdmaTransport::submitTransferTask() 中(rdma_transport.cpp 行 465-582)。

5.1 四层重试

第一层:设备选择重试(selectDevice

// rdma_transport.cpp 行 691-728
int RdmaTransport::selectDevice(SegmentDesc *desc, uint64_t offset,
                                size_t length, std::string_view hint,
                                int &buffer_id, int &device_id,
                                int retry_count) {
    if (desc == nullptr) return ERR_ADDRESS_NOT_REGISTERED;
    const auto &buffers = desc->buffers;
    for (buffer_id = 0; buffer_id < static_cast<int>(buffers.size()); ++buffer_id) {
        const auto &buffer = buffers[buffer_id];
        // 检查 offset 是否在 buffer 范围内
        if (offset < buffer.addr || length > buffer.length ||
            offset - buffer.addr > buffer.length - length) continue;

        // 尝试按 location 选择设备,失败则回退到 wildcard
        device_id = hint.empty()
            ? desc->topology.selectDevice(location, retry_count)
            : desc->topology.selectDevice(location, hint, retry_count);
        if (device_id >= 0) return 0;

        // 使用 wildcard location 再试一次
        device_id = hint.empty()
            ? desc->topology.selectDevice(kWildcardLocation, retry_count)
            : desc->topology.selectDevice(kWildcardLocation, hint, retry_count);
        if (device_id >= 0) return 0;
    }
    return ERR_ADDRESS_NOT_REGISTERED;
}

第二层:Slice 级别重试(主循环)

// rdma_transport.cpp 行 523-551
while (retry_cnt < kMaxRetryCount && !found_device) {
    if (selectDevice(local_segment_desc.get(),
                     (uint64_t)slice->source_addr, slice->length,
                     buffer_id, device_id, retry_cnt++))
        continue;
    // ... 检查 device 是否 active
    found_device = true;
    break;
}
if (!found_device) {
    // 清理已分配 slice,返回 AddressNotRegistered
    return Status::AddressNotRegistered(
        "Memory region not registered by any active device(s)");
}

关键参数:

第三层:Worker Pool 后提交阶段重试

Worker Pool 在 post-send 和 CQ 轮询阶段均有重试逻辑,与提交阶段共用同一 slice->rdma.retry_cnt / max_retry_cnt 计数器:

注意:CQ Flush 错误(IBV_WC_WR_FLUSH_ERR)是特殊处理——直接 markFailed() 不重试,因为这类错误发生在 QP 转换到 ERR 状态时(如 endpoint 销毁),并非真正的网络错误。

第四层:submitPostSend 后异步等待 CQ 完成

RDMA 的 submitPostSend 将 slice 提交到队列对(QP),后续通过轮询 Completion Queue 异步完成。Slice 的成功/失败回调在执行完成后触发 markSuccess/markFailed

5.2 重试配置总览

配置项默认值说明
retry_cnt9全局最大重试次数
slice_size65536 (64KB)slice 切分粒度
slice_timeout-1(禁用)slice 超时秒数
fragment_limit16384 (16KB)最后一个 slice 可与前一个合并的阈值

六、超时检测机制

超时检测在 MultiTransport::getTransferStatus() 中实现(multi_transport.cpp 行 202-216),由调用方轮询驱动而非主动中断

auto checkSliceTimeout = [&](const Transport::TransferTask& t) -> bool {
    if (globalConfig().slice_timeout <= 0) return false;  // 默认禁用
    auto current_ts = getCurrentTimeInNano();
    const int64_t kPacketDeliveryTimeout =
        globalConfig().slice_timeout * 1000000000;
    for (auto& slice : t.slice_list) {
        auto ts = slice->ts;
        if (ts > 0 && current_ts > ts &&
            current_ts - ts > kPacketDeliveryTimeout) {
            LOG(INFO) << "Slice timeout detected";
            return true;
        }
    }
    return false;
};

关键特征:


七、对端存活检测(Peer Liveness)

TE 提供对端存活探测能力,但不自动重连:

// transfer_engine_impl.cpp 行 445-451
int TransferEngineImpl::probePeerAliveByID(SegmentID target_id) {
    auto desc = metadata_->getSegmentDescByID(target_id);
    if (!desc) {
        return ERR_METADATA;
    }
    return metadata_->sendProbe(desc->name);
}

通过 metadata server 向目标发送探测消息。API 层面映射为 PeerLiveness 枚举:

// transfer_engine.h 行 37-40
enum class PeerLiveness : uint8_t {
    Alive = 0,
    Unreachable = 1,
};

调用方负责根据 liveness 结果决定后续处理策略。


八、TCP Transport 的错误处理

TCP transport 使用异常捕获 + 连接池清理的模式(tcp_transport.cpp 行 875-967):

void TcpTransport::startTransfer(Slice* slice) {
    auto desc = metadata_->getSegmentDescByID(slice->target_id);
    if (!desc) {
        slice->markFailed();    // 预检查失败直接标记
        return;
    }
    // ... RPC meta 查询、连接获取 ...

    try {
        auto session = std::make_shared<ClientSession>(socket);
        // 注册异步完成回调
        session->on_finalize_ = [slice](TransferStatusEnum status) {
            if (status == TransferStatusEnum::COMPLETED)
                slice->markSuccess();
            else
                slice->markFailed();
        };
        session->initiate(slice->source_addr, slice->tcp.dest_addr,
                          slice->length, slice->opcode);
    } catch (std::exception& e) {
        LOG(ERROR) << "Transfer exception: " << e.what();
        // 关闭异常连接
        socket->close(ec);
        // 从连接池中移除异常连接
        if (enable_connection_pool_) {
            ConnectionKey key{meta_entry.ip_or_host_name, desc->tcp_data_port};
            // ... 在连接池中查找并移除异常连接 ...
        }
        slice->markFailed();
    }
}

TCP 的错误处理特点:


九、调用方职责:关键设计哲学

TE 不做自动的 batch 级别重试。调用方的标准使用模式如下:

// 示例: transfer_engine_bench_with_retry.cpp 行 205-227
auto batch_id = engine->allocateBatchID(batch_size);
auto s = engine->submitTransfer(batch_id, requests);    // 1. 提交

if (!s.ok()) {
    LOG(INFO) << "Found Failed Requests";               // 2. 提交失败
} else {
    for (int task_id = 0; task_id < batch_size; ++task_id) {
        bool completed = false;
        TransferStatus status;
        while (!completed) {
            Status s = engine->getTransferStatus(batch_id, task_id, status);  // 3. 轮询
            LOG_ASSERT(s.ok());
            if (status.s == TransferStatusEnum::COMPLETED)
                completed = true;         // 成功
            else if (status.s == TransferStatusEnum::FAILED) {
                LOG(INFO) << "Found Failed Requests";  // 失败,调用方可重试
                completed = true;
            }
            // TIMEOUT 状态也需要调用方自行处理
        }
    }
    s = engine->freeBatchID(batch_id);                   // 4. 释放 batch
}

调用方决策矩阵:

状态含义调用方可选操作
COMPLETED传输成功释放 batch,继续下一轮
FAILED所有 slice 重试耗尽释放 batch 或重新提交
TIMEOUT超过 timeout 时间释放 batch 或重试(建议先 probePeerAlive)
WAITING仍在进行中继续轮询
INVALID请求无效检查参数后重新构造请求

十、代码验证

以下是对上述关键路径的源码级验证:

10.1 错误码定义

文件mooncake-transfer-engine/include/error.h(行 18-38)

文件mooncake-transfer-engine/include/common/base/status.h(行 33-54)

10.2 Slice 原子计数器

文件mooncake-transfer-engine/include/transport/transport.h(行 168-182)

10.3 RDMA 重试逻辑

文件mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp

10.4 超时检测

文件mooncake-transfer-engine/include/config.h(行 56)

int64_t slice_timeout = -1;    // 默认禁用

文件mooncake-transfer-engine/src/multi_transport.cpp(行 202-216)

if (globalConfig().slice_timeout <= 0) return false;  // 最早短路

10.5 TCP transport 连接池异常清理

文件mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp(行 933-966)

10.6 Batch 级别聚合

文件mooncake-transfer-engine/src/multi_transport.cpp(行 259-303)

10.7 Peer 存活探测

文件mooncake-transfer-engine/src/transfer_engine_impl.cpp(行 445-451)

10.8 调用方重试示例

文件mooncake-transfer-engine/example/transfer_engine_bench_with_retry.cpp(行 205-227)


十一、总结

Mooncake Transfer Engine 的错误处理策略可以概括为:

  1. 分层解耦:Slice 级别自动重试,Task/Batch 级别状态聚合,Transport 层独立实现,调用方最终决策
  2. 异步轮询:传输提交几乎不阻塞,完成状态通过轮询获取,不引入主动中断或回调
  3. 原子无锁:Slice 的成功/失败计数使用 __atomic_fetch_add,避免锁竞争
  4. 默认保守:slice_timeout 默认禁用(-1),需要调用方自行启用
  5. 不做自动恢复:不自动重试 batch,不自动重连断开的对端,一切由上层控制
  6. 内存复用:Slice 对象通过 ThreadLocalSliceCache(容量 4096)环形缓冲区复用,减少分配开销

修订说明

本文基于审阅意见(04-review-notes.md)对以下内容进行了修订:

  1. Section 10.1:错误码宏数量从”16”更正为”17”(与 Section 2.1 表格及 error.h 源码一致)
  2. Section 2.2:修正”Status::Code 与 error.h 完全对应”的表述,标注两处例外:kNotImplemented=999ERR_NOT_IMPLEMENTED=-303 数值不同,kNotSupportedTransport=8 在 error.h 中无对应 C 宏
  3. Section 4.4:传输协议数量从”20+ 种”更正为”15 种”(与 installTransport() 源码一致)
  4. Section 5.1:重试层数从”三层”扩展为”四层”,补充 Worker Pool 后提交阶段重调度(performPostSend/performPollCqredispatch
  5. Section 9:补充示例代码中省略的 Status s = 返回值捕获和 LOG_ASSERT(s.ok()) 断言
  6. Section 10.3:修正三处行号引用(主循环结束行 551→540,selectDevice 结束行 727→728)

Share this post on:

Previous Post
Mooncake TE 阅读手记-10-QP Depth 与 Slice 处理
Next Post
Mooncake TE 阅读手记-08-握手协议与 QP 状态机