Apache RocketMQ 定时消息功能的演进:从固定延迟到任意时间戳与消息撤回的深度解析

gomkiri 发布于 2025-11-02 46 次阅读


AI 摘要

从固定延迟到任意时间戳,Apache RocketMQ定时消息功能如何实现架构革新?深度解析4.x固定等级到5.0任意精度的演进历程,揭秘Chronos模块与gRPC协议如何突破2小时限制,实现毫秒级精准投递。

Apache RocketMQ 作为业界领先的分布式消息与流处理平台,其定时/延时消息功能是构建复杂业务场景(如订单超时处理、分布式定时调度)的核心组件。该功能并非一蹴而就,而是经历了一个从受限到灵活、从简单投递到可精细化管理的显著演进过程。本次将深入剖析 RocketMQ 定时消息功能的三个关键发展阶段,并提供每个阶段对应的 Java 核心操作代码。

阶段一:RocketMQ 4.x 时代 —— 固定延迟等级的实现

定时消息功能在 Apache RocketMQ 4.x 版本中已经存在,但此时它被称为“延时消息发送”(Delayed Message Sending)1

1.1 业务场景与架构实现

在 4.x 时代,定时消息的核心目的是支持有限的、非高精度的延时业务,例如电商交易中常见的“订单创建后30分钟未支付则自动关闭”的场景 2

其架构实现并非一个真正的、可管理任意时间的定时器系统,而是一种巧妙但有限的“变通”方案:

  1. 内部主题:Broker 端预定义了一个名为 SCHEDULE_TOPIC_XXXX 的内部主题 5
  2. 固定队列:该主题默认被划分为18个消息队列(Message Queue),每个队列对应一个固定的延迟时间 1
  3. 消息重定向:当生产者(Producer)发送一个延时消息时(例如设置 setDelayTimeLevel(3)),Broker 会拦截该消息,将其原始的 Topic 和队列信息存储起来,然后将消息体转储到 SCHEDULE_TOPIC_XXXX 主题中对应的队列(例如,Level 3 对应 Queue 2)。
  4. 定时扫描:Broker 内部有一个专用的定时调度线程(Scheduled thread)5,该线程会按照18个队列各自固定的周期(如1s, 5s, 10s...)进行轮询。
  5. 消息恢复与投递:当定时器扫描到某个队列中的消息已达到其预设的投递时间,调度器会从消息中恢复其原始的 Topic,并将该消息重新作为一条普通消息投递到目标 Topic 的 CommitLog 中,此时消费者(Consumer)才能拉取到该消息 5

1.2 18级固定延迟

这种架构的核心约束在于,用户不能指定任意的延时时间,而必须从一个预设的、固定的等级列表中选择。该配置定义在 Broker 的 MessageStoreConfig 中 3

表1:RocketMQ 4.x 默认的18个固定延迟等级 1

等级 (Level)延迟时间 (Delay Time)等级 (Level)延迟时间 (Delay Time)
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h

这种设计的权衡非常明显:实现简单、Broker 资源开销极低(仅需18个定时轮询任务),但牺牲了用户侧的灵活性。例如,用户无法实现“35分钟”的延时,也无法超过2小时的最大延时 1。这种严重的局限性是推动 RocketMQ 5.0 进行架构革新的根本原因。

1.3 Java 操作指南 (4.x Remoting SDK)

在 4.x 版本中,使用 org.apache.rocketmq:rocketmq-client 依赖发送固定延时消息。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer4x {
    public static void main(String args) throws Exception {
        // 1. 实例化生产者
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // 2. 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 3. 启动生产者
        producer.start();

        Message message = new Message("TestTopic",
                                    "Hello scheduled message".getBytes());

        // 4. 【核心】设置延时等级
        // 级别 3 对应表1中的 10s 延时
        message.setDelayTimeLevel(3);

        // 5. 发送消息
        producer.send(message);
       
        System.out.println("延时消息已发送 (10s 后投递)");
        // 6. 关闭生产者
        producer.shutdown();
    }
}
Java

阶段二:RocketMQ 5.0 —— 任意精度定时与最大延时变更

随着 RocketMQ 5.0 向云原生、“消息、事件、流”融合的架构演进 6,定时消息功能迎来了革命性的变化。

2.1 与 5.0.0 版本的发布

用户对任意时间精度的需求最终被 Apache RocketMQ 社区通过 Support Timing Messages with Arbitrary Time Delay(支持任意时间延时的定时消息)提案所采纳 9

该功能在 Apache RocketMQ 5.0.0 版本(2022年9月发布)中正式引入 9这是 RocketMQ 首次在开源版本中支持任意修改延时时间

2.2 全新架构:时间戳、gRPC 与 Chronos

5.0 版本的定时消息彻底摒弃了 4.x 的“延时等级”模型,转而采用“绝对投递时间戳”模型。

  1. API 变更:API 不再要求传入一个“等级”(level),而是要求传入一个毫秒级的 Unix 时间戳(Timestamp),代表消息应被投递的“绝对时刻” 11
  2. SDK 要求:此功能必须使用 5.x 全新的 gRPC 协议 SDK (org.apache.rocketmq:rocketmq-client-java) 12。旧的 4.x DefaultMQProducer 客户端无法使用此新特性。
  3. Broker 架构 (Chronos):为了支持对海量、具有不同毫秒级时间戳的消息进行高效索引和检索,5.0 架构引入了名为 "Chronos" 的延时消息模块 13。该模块使用 RocksDB (一种高性能的 LSM-Tree 键值存储) 13
  4. 工作机制:消息不再进入 SCHEDULE_TOPIC_XXXX,而是被 Chronos 模块接收,并以 (DeliveryTimestamp, MessageId) 作为索引键存入 RocksDB。定时器只需高效地扫描 RocksDB 中时间戳到期的消息,并将其投递回原始 Topic,从而实现了任意时间的毫秒级精度 11
  5. 最大延时变更:得益于新架构,4.x 的2小时限制被打破。在 5.0 中,默认的最大可设置延时时间调整为24小时 11。一些基于此架构的云服务商甚至允许设置长达一年的定时消息 2

2.3 Java 操作指南 (5.x gRPC SDK)

使用 org.apache.rocketmq:rocketmq-client-java (例如 5.0.7 或更高版本) 15 依赖。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.message.Message;
import java.time.Duration;
import java.time.Instant;

public class ScheduledMessageProducer5x {
    public static void main(String args) throws Exception {
        // 1. 5.x SDK 需要 gRPC Endpoint,指向 Proxy (默认 8081)
        // 注意:不再是 4.x 的 NameServer (9876)
        String endpoint = "localhost:8081";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration config = ClientConfiguration.newBuilder()
                                          .setEndpoints(endpoint)
                                          .build();
       
        // 2. 实例化 5.x gRPC 生产者
        Producer producer = provider.newBuilder(config).build();

        // 3. 计算绝对投递时间戳
        // 例如:10 分钟后投递
        long tenMinutesFromNow = Instant.now().plus(Duration.ofMinutes(10)).toEpochMilli();

        // 4. 使用 MessageBuilder 构建消息
        Message message = provider.newMessageBuilder()
          .setTopic("TestTopic")
          .setKeys("messageKey")
          .setTag("messageTag")
            // 5. 【核心】设置绝对投递时间戳 (毫秒)
            // 这是取代 setDelayTimeLevel 的新 API
          .setDeliveryTimestamp(tenMinutesFromNow)
          .setBody("Hello precise scheduled message".getBytes())
          .build();

        // 6. 发送消息
        SendReceipt sendReceipt = producer.send(message);
        System.out.println("定时消息已发送, MsgID: " + sendReceipt.getMessageId());

        // 7. 关闭生产者
        producer.close();
    }
}
Java

2.3.1 补充说明:__STARTDELIVERTIME 属性

在研究中可能会发现一些(尤其是云厂商的)文档使用 msg.putUserProperty("__STARTDELIVERTIME",...) 来设置任意时间 2

这是一种历史遗留的“hack”方式。在 5.0.0 开源版本正式支持任意时间之前,一些云服务提供商(如腾讯云 TDMQ)为了满足客户需求,自行扩展了 4.x 客户端,通过一个特殊的 User Property (__STARTDELIVERTIME) 来传递时间戳 18

在开发全新的 Apache RocketMQ 5.x 应用时,应始终使用 messageBuilder.setDeliveryTimestamp() API,而非 putUserProperty 方式。

2.4 关于最大延时时间

Apache RocketMQ 5.0 版本文档中明确说明了:定时时长最大值默认为24小时,不支持自定义修改。但是这种描述与开源社区版本的实际 Broker 配置存在差异,在 5.0.0 版本的 Broker 源码中引入了一个关键参数 timerMaxDelaySec。

  1. 默认值差异:此参数在代码中的默认值通常被硬编码为 3 天(3600 * 24 * 3,即 259200 秒或 259200000 毫秒),而非文档所称的 24 小时。
  2. 可配置性:这个 timerMaxDelaySec 参数是可以通过 Broker 的 broker.properties 配置文件进行修改的。如果运维人员未修改此配置,而生产者发送的延时(例如4天)超过这个值,就会收到类似 timer message illegal, the delay time should not be bigger than the max delay... 的错误。
  3. 配置清理:这种文档(24小时)与代码(3天)的不一致,以及可能存在的另一个废弃参数 (BrokerConfig.maxDelayTime) 25,最终在 5.1.1 等后续版本中得到了社区的关注和清理。

阶段三:RocketMQ 5.3.2 —— 增加定时消息可取消功能

随着 5.0 架构的普及,一个新的痛点浮现:定时消息一旦发送便无法撤回。例如,用户创建订单时发送了30分钟的超时关闭消息,但用户在第5分钟支付了订单。此时,应用层希望能够取消那条将在25分钟后触发的“关闭订单”消息。

在 5.3.2 版本之前,这在开源版本中是无法做到的 5。唯一的办法是让消费者接收到消息后,再通过业务逻辑检查订单状态,决定是否执行(一种被动的“检查回退”模式)4

3.1 架构前提与 5.3.2 版本发布

架构前提:取消功能在 4.x 时代是不可想象的,因为消息在 SCHEDULE_TOPIC_XXXX 中是匿名的,无法被精确定位 5。而 5.0 的 Chronos/RocksDB 架构 13 使得每一条定时消息都通过其 MessageId 变得可索引、可寻址。这为“取消”功能提供了必要的技术前提。

版本里程碑Apache RocketMQ 5.3.2(2025年3月发布)的发行说明 (Release Notes) 中明确指出,该版本“adds support for the ability to cancel scheduled messages”(增加了取消定时消息的能力)20

这是 RocketMQ 首次在开源版本中支持定时消息撤回

3.2 "Recalling API" 的实现

这一功能在 5.x gRPC 客户端 (rocketmq-clients) 中被称为 "Recalling API"(召回 API)21

根据 rocketmq-clients 仓库的提交记录,该功能的核心实现是 The java implementation of recalling API(Issue #878 的 Java 召回 API 实现),对应 Pull Request #879 22

底层的 gRPC 接口增加了一个 RecallMessage RPC 调用 24。在 Java 客户端,这被封装为 Producer 接口上的一个新方法(如 recallMessage)。

3.3 Java 操作指南 (5.x gRPC SDK v5.3.2+)

要使用此功能,必须使用 5.3.2 或更高版本的 Broker,以及配套的 rocketmq-client-java (gRPC) SDK。

关键前提:必须在发送定时消息时,保存返回的 SendReceipt 对象,因为撤回操作需要 SendReceipt 中包含的 MessageId 和消息句柄(Handle)信息。

(注:鉴于该功能较新,以下是基于 5.x gRPC SDK 21 和 Recalling API 22 的 API 规范所构建的标准操作示例。)

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.message.Message;
import java.time.Duration;
import java.time.Instant;

public class CancelScheduledMessage532 {
   
    public static void main(String args) throws Exception {
       
        String endpoint = "localhost:8081";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration config = ClientConfiguration.newBuilder()
                                          .setEndpoints(endpoint)
                                          .build();

        // 1. 实例化 5.x gRPC 生产者 (版本需支持 5.3.2+)
        Producer producer = provider.newBuilder(config).build();

        // 2. 构建一个 10 分钟后投递的定时消息
        long deliverTimestamp = Instant.now().plus(Duration.ofMinutes(10)).toEpochMilli();
        Message message = provider.newMessageBuilder()
          .setTopic("TestTopic")
          .setDeliveryTimestamp(deliverTimestamp)
          .setBody("This message might be cancelled".getBytes())
          .build();

        // 3. 发送定时消息,并【必须】保存 SendReceipt
        SendReceipt sendReceipt = producer.send(message);
        System.out.println("定时消息已发送, MsgID: " + sendReceipt.getMessageId());

        //... 假设在 10 分钟内,业务逻辑发生变化 (例如订单已支付)
        //... 现在需要取消该定时消息
       
        try {
            // 4. 【核心】调用 recallMessage 方法
            // 该方法是 5.3.2+ 版本 gRPC SDK 的一部分
            // 它需要原始的 SendReceipt 来定位消息
            producer.recallMessage(sendReceipt);
           
            System.out.println("定时消息已成功撤回, MsgID: " + sendReceipt.getMessageId());

        } catch (Exception e) {
            // 撤回可能会失败,例如:
            // 1. 消息已经被投递(已过定时时间)
            // 2. 消息句柄无效或网络问题
            System.err.println("撤回消息失败: " + e.getMessage());
        }

        // 5. 关闭生产者
        producer.close();
    }
}
Java

(注:某些云厂商(如华为云 2)在 5.3.2 之前提供了私有的取消方式,即发送一条带有 __CANCEL_SCHEDULED_MSG 属性的“命令消息”2。这与 Apache RocketMQ 5.3.2 的原生 recallMessage RPC 属不同实现,不应混淆。)

总结:RocketMQ 定时消息的演进矩阵

本报告分析了 RocketMQ 定时消息从 4.x 到 5.3.2 的演进脉络。其发展路径清晰地反映了 RocketMQ 从一个简单的消息总线向一个功能完备的、企业级的云原生事件平台的战略转型。

表2:RocketMQ 定时消息功能演进矩阵

功能特性关键版本核心 SDK (Java)关键 API 方法最大延时 (默认)是否可取消
固定延迟等级4.x (如 4.9.x) 1org.apache.rocketmq:rocketmq-clientmessage.setDelayTimeLevel(int level) 12 小时 (Level 18) 1 5
任意时间戳定时5.0.0 9org.apache.rocketmq:rocketmq-client-java (gRPC)messageBuilder.setDeliveryTimestamp(long ts) [17]24 小时 11否 (5.3.2 之前)
定时消息撤回5.3.2 20org.apache.rocketmq:rocketmq-client-java (gRPC)producer.recallMessage(SendReceipt receipt) 2224 小时

架构师建议

  1. 新项目选型:对于所有新启动的项目,强烈建议直接使用 Apache RocketMQ 5.3.2 或更高版本的 Broker,并配合使用 5.x gRPC SDK (rocketmq-client-java)。这不仅提供了任意时间精度的定时能力,也提供了关键的“撤回”能力,使业务Saga可以形成完整的闭环。
  2. 4.x 迁移评估:对于仍在 4.x 上运行的存量系统,迁移至 5.x 并非简单的版本升级。它涉及 gRPC Proxy 的部署 16、客户端 SDK 的完全替换(从 Remoting 到 gRPC 12)以及所有定时消息逻辑的重写。架构师应评估迁移的必要性,仅当业务强依赖以下三个特性时,才值得投入迁移成本:
  • 需要超过2小时的延时。
  • 需要固定等级(如 10s, 30s, 1m...)之外的任意时间精度。
  • 需要对已发送的定时消息执行“取消”操作。

引用的著作

  1. Delayed Message Sending - Apache RocketMQ, 访问时间为 十一月, 2025, https://rocketmq.apache.org/docs/4.x/producer/04message3/
  2. Delivering Scheduled Messages_Java (TCP)_Developer Guide_Distributed Message Service for RocketMQ-Huawei Cloud, 访问时间为 十一月, 2025, https://support.huaweicloud.com/eu/devg-hrm/hrm-devg-009.html
  3. Schedule example | Study, 访问时间为 十一月, 2025, https://linqiankun.github.io/hexoblog/yuque/rocketMq/Schedule%20example/
  4. Delivering Scheduled Messages - Cloud Service Help Center - Huawei, 访问时间为 十一月, 2025, https://doc.hcs.huawei.com/devg/dms/hrm-devg-009.html
  5. How to deal with rocketmq message accumulation and message delay #3095 - GitHub, 访问时间为 十一月 2, 2025, https://github.com/apache/rocketmq/issues/3095
  6. Apache RocketMQ 5.0: Evolution of a messaging, event, and stream converged high availability architecture - ApacheCon Asia 2022 - Powered by ALC-Beijing, 访问时间为 十一月, 2025, https://apachecon.com/acasia2022/sessions/messaging-1052.html
  7. Why choose RocketMQ, 访问时间为 十一月, 2025, https://rocketmq.apache.org/docs/
  8. RocketMQ · 官方网站 | RocketMQ - The Apache Software Foundation, 访问时间为 十一月, 2025, https://rocketmq.apache.org/
  9. Release Notes - Apache RocketMQ - Version 5.0.0, 访问时间为 十一月, 2025, https://rocketmq.apache.org/release-notes/2022/09/09/5.0.0/
  10. RocketMQ Improvement Proposal - GitHub, 访问时间为 十一月, 2025, https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal
  11. Delay Message - Apache RocketMQ, 访问时间为 十一月, 2025, https://rocketmq.apache.org/docs/featureBehavior/02delaymessage/
  12. Java Client SDK - Apache RocketMQ, 访问时间为 十一月, 2025, https://rocketmq.apache.org/docs/sdk/02java/
  13. Common Implementation Schemes of Delayed Messages - Ric.Studio 进击的程序员笔记, 访问时间为 十一月 2, 2025, https://segmentfault.com/a/1190000041282378/en
  14. What Is DMS for RocketMQ? - Distributed Message Service (DMS) 2.3.0.1 Usage Guide (for Huawei Cloud Stack 8.5.1) 01, 访问时间为 十一月, 2025, https://info.support.huawei.com/enterprise/en/doc/EDOC1100450007/85505261/what-is-dms-for-rocketmq
  15. ApsaraMQ for RocketMQ:Step 3: Use an SDK to send and receive messages - Alibaba Cloud, 访问时间为 十一月, 2025, https://www.alibabacloud.com/help/en/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/getting-started/step-3-use-sdks-to-send-and-receive-messages
  16. Run RocketMQ locally, 访问时间为 十一月, 2025, https://rocketmq.apache.org/docs/quickStart/01quickstart/
  17. ApsaraMQ for RocketMQ:Scheduled and delayed messages - Alibaba Cloud, 访问时间为 十一月, 2025, https://www.alibabacloud.com/help/en/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/scheduled-and-delayed-messages
  18. Scheduled Message and Delayed Message - Tencent Cloud, 访问时间为 十一月, 2025, https://www.tencentcloud.com/document/product/1113/58502
  19. how to delete delayqueue message #1583 - apache/rocketmq - GitHub, 访问时间为 十一月 2, 2025, https://github.com/apache/rocketmq/issues/1583
  20. Release Notes - Apache RocketMQ - Version 5.3.2, 访问时间为 十一月, 2025, https://rocketmq.apache.org/release-notes/2025/03/08/5.3.2/
  21. Collection of Client Bindings for Apache RocketMQ - GitHub, 访问时间为 十一月, 2025, https://github.com/apache/rocketmq-clients
  22. Releases · apache/rocketmq-clients - GitHub, 访问时间为 十一月, 2025, https://github.com/apache/rocketmq-clients/releases
  23. [Feature] Implement Recalling API in Java SDK · Issue #878 · apache/rocketmq-clients, 访问时间为 十一月, 2025, https://github.com/apache/rocketmq-clients/issues/878
  24. Diff - d2f95889b4852251ba5c526ef790ea8735b42100^! - rocketmq, 访问时间为 十一月, 2025, https://apache.googlesource.com/rocketmq-clients/+/d2f95889b4852251ba5c526ef790ea8735b42100%5E%21/
  25. Release Notes - Apache RocketMQ - Version 5.1.1, 访问时间为 十一月, 2025, https://rocketmq.apache.org/release-notes/2023/05/15/5.1.1/