Apache RocketMQ 作为业界领先的分布式消息与流处理平台,其定时/延时消息功能是构建复杂业务场景(如订单超时处理、分布式定时调度)的核心组件。该功能并非一蹴而就,而是经历了一个从受限到灵活、从简单投递到可精细化管理的显著演进过程。本次将深入剖析 RocketMQ 定时消息功能的三个关键发展阶段,并提供每个阶段对应的 Java 核心操作代码。
阶段一:RocketMQ 4.x 时代 —— 固定延迟等级的实现
定时消息功能在 Apache RocketMQ 4.x 版本中已经存在,但此时它被称为“延时消息发送”(Delayed Message Sending)1。
1.1 业务场景与架构实现
在 4.x 时代,定时消息的核心目的是支持有限的、非高精度的延时业务,例如电商交易中常见的“订单创建后30分钟未支付则自动关闭”的场景 2。
其架构实现并非一个真正的、可管理任意时间的定时器系统,而是一种巧妙但有限的“变通”方案:
- 内部主题:Broker 端预定义了一个名为 SCHEDULE_TOPIC_XXXX 的内部主题 5。
- 固定队列:该主题默认被划分为18个消息队列(Message Queue),每个队列对应一个固定的延迟时间 1。
- 消息重定向:当生产者(Producer)发送一个延时消息时(例如设置 setDelayTimeLevel(3)),Broker 会拦截该消息,将其原始的 Topic 和队列信息存储起来,然后将消息体转储到 SCHEDULE_TOPIC_XXXX 主题中对应的队列(例如,Level 3 对应 Queue 2)。
- 定时扫描:Broker 内部有一个专用的定时调度线程(Scheduled thread)5,该线程会按照18个队列各自固定的周期(如1s, 5s, 10s...)进行轮询。
- 消息恢复与投递:当定时器扫描到某个队列中的消息已达到其预设的投递时间,调度器会从消息中恢复其原始的 Topic,并将该消息重新作为一条普通消息投递到目标 Topic 的 CommitLog 中,此时消费者(Consumer)才能拉取到该消息 5。
1.2 18级固定延迟
这种架构的核心约束在于,用户不能指定任意的延时时间,而必须从一个预设的、固定的等级列表中选择。该配置定义在 Broker 的 MessageStoreConfig 中 3。
表1:RocketMQ 4.x 默认的18个固定延迟等级 1
| 等级 (Level) | 延迟时间 (Delay Time) | 等级 (Level) | 延迟时间 (Delay Time) |
| 1 | 1s | 10 | 6min |
| 2 | 5s | 11 | 7min |
| 3 | 10s | 12 | 8min |
| 4 | 30s | 13 | 9min |
| 5 | 1min | 14 | 10min |
| 6 | 2min | 15 | 20min |
| 7 | 3min | 16 | 30min |
| 8 | 4min | 17 | 1h |
| 9 | 5min | 18 | 2h |
这种设计的权衡非常明显:实现简单、Broker 资源开销极低(仅需18个定时轮询任务),但牺牲了用户侧的灵活性。例如,用户无法实现“35分钟”的延时,也无法超过2小时的最大延时 1。这种严重的局限性是推动 RocketMQ 5.0 进行架构革新的根本原因。
1.3 Java 操作指南 (4.x Remoting SDK)
在 4.x 版本中,使用 org.apache.rocketmq:rocketmq-client 依赖发送固定延时消息。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class ScheduledMessageProducer4x {
public static void main(String args) throws Exception {
// 1. 实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 2. 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 3. 启动生产者
producer.start();
Message message = new Message("TestTopic",
"Hello scheduled message".getBytes());
// 4. 【核心】设置延时等级
// 级别 3 对应表1中的 10s 延时
message.setDelayTimeLevel(3);
// 5. 发送消息
producer.send(message);
System.out.println("延时消息已发送 (10s 后投递)");
// 6. 关闭生产者
producer.shutdown();
}
}
Java阶段二:RocketMQ 5.0 —— 任意精度定时与最大延时变更
随着 RocketMQ 5.0 向云原生、“消息、事件、流”融合的架构演进 6,定时消息功能迎来了革命性的变化。
2.1 与 5.0.0 版本的发布
用户对任意时间精度的需求最终被 Apache RocketMQ 社区通过 Support Timing Messages with Arbitrary Time Delay(支持任意时间延时的定时消息)提案所采纳 9。
该功能在 Apache RocketMQ 5.0.0 版本(2022年9月发布)中正式引入 9。这是 RocketMQ 首次在开源版本中支持任意修改延时时间。
2.2 全新架构:时间戳、gRPC 与 Chronos
5.0 版本的定时消息彻底摒弃了 4.x 的“延时等级”模型,转而采用“绝对投递时间戳”模型。
- API 变更:API 不再要求传入一个“等级”(level),而是要求传入一个毫秒级的 Unix 时间戳(Timestamp),代表消息应被投递的“绝对时刻” 11。
- SDK 要求:此功能必须使用 5.x 全新的 gRPC 协议 SDK (org.apache.rocketmq:rocketmq-client-java) 12。旧的 4.x DefaultMQProducer 客户端无法使用此新特性。
- Broker 架构 (Chronos):为了支持对海量、具有不同毫秒级时间戳的消息进行高效索引和检索,5.0 架构引入了名为 "Chronos" 的延时消息模块 13。该模块使用 RocksDB (一种高性能的 LSM-Tree 键值存储) 13。
- 工作机制:消息不再进入 SCHEDULE_TOPIC_XXXX,而是被 Chronos 模块接收,并以 (DeliveryTimestamp, MessageId) 作为索引键存入 RocksDB。定时器只需高效地扫描 RocksDB 中时间戳到期的消息,并将其投递回原始 Topic,从而实现了任意时间的毫秒级精度 11。
- 最大延时变更:得益于新架构,4.x 的2小时限制被打破。在 5.0 中,默认的最大可设置延时时间调整为24小时 11。一些基于此架构的云服务商甚至允许设置长达一年的定时消息 2。
2.3 Java 操作指南 (5.x gRPC SDK)
使用 org.apache.rocketmq:rocketmq-client-java (例如 5.0.7 或更高版本) 15 依赖。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.message.Message;
import java.time.Duration;
import java.time.Instant;
public class ScheduledMessageProducer5x {
public static void main(String args) throws Exception {
// 1. 5.x SDK 需要 gRPC Endpoint,指向 Proxy (默认 8081)
// 注意:不再是 4.x 的 NameServer (9876)
String endpoint = "localhost:8081";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints(endpoint)
.build();
// 2. 实例化 5.x gRPC 生产者
Producer producer = provider.newBuilder(config).build();
// 3. 计算绝对投递时间戳
// 例如:10 分钟后投递
long tenMinutesFromNow = Instant.now().plus(Duration.ofMinutes(10)).toEpochMilli();
// 4. 使用 MessageBuilder 构建消息
Message message = provider.newMessageBuilder()
.setTopic("TestTopic")
.setKeys("messageKey")
.setTag("messageTag")
// 5. 【核心】设置绝对投递时间戳 (毫秒)
// 这是取代 setDelayTimeLevel 的新 API
.setDeliveryTimestamp(tenMinutesFromNow)
.setBody("Hello precise scheduled message".getBytes())
.build();
// 6. 发送消息
SendReceipt sendReceipt = producer.send(message);
System.out.println("定时消息已发送, MsgID: " + sendReceipt.getMessageId());
// 7. 关闭生产者
producer.close();
}
}
Java2.3.1 补充说明:__STARTDELIVERTIME 属性
在研究中可能会发现一些(尤其是云厂商的)文档使用 msg.putUserProperty("__STARTDELIVERTIME",...) 来设置任意时间 2。
这是一种历史遗留的“hack”方式。在 5.0.0 开源版本正式支持任意时间之前,一些云服务提供商(如腾讯云 TDMQ)为了满足客户需求,自行扩展了 4.x 客户端,通过一个特殊的 User Property (__STARTDELIVERTIME) 来传递时间戳 18。
在开发全新的 Apache RocketMQ 5.x 应用时,应始终使用 messageBuilder.setDeliveryTimestamp() API,而非 putUserProperty 方式。
2.4 关于最大延时时间
在 Apache RocketMQ 5.0 版本文档中明确说明了:定时时长最大值默认为24小时,不支持自定义修改。但是这种描述与开源社区版本的实际 Broker 配置存在差异,在 5.0.0 版本的 Broker 源码中引入了一个关键参数 timerMaxDelaySec。
- 默认值差异:此参数在代码中的默认值通常被硬编码为 3 天(3600 * 24 * 3,即 259200 秒或 259200000 毫秒),而非文档所称的 24 小时。
- 可配置性:这个 timerMaxDelaySec 参数是可以通过 Broker 的 broker.properties 配置文件进行修改的。如果运维人员未修改此配置,而生产者发送的延时(例如4天)超过这个值,就会收到类似 timer message illegal, the delay time should not be bigger than the max delay... 的错误。
- 配置清理:这种文档(24小时)与代码(3天)的不一致,以及可能存在的另一个废弃参数 (BrokerConfig.maxDelayTime) 25,最终在 5.1.1 等后续版本中得到了社区的关注和清理。
阶段三:RocketMQ 5.3.2 —— 增加定时消息可取消功能
随着 5.0 架构的普及,一个新的痛点浮现:定时消息一旦发送便无法撤回。例如,用户创建订单时发送了30分钟的超时关闭消息,但用户在第5分钟支付了订单。此时,应用层希望能够取消那条将在25分钟后触发的“关闭订单”消息。
在 5.3.2 版本之前,这在开源版本中是无法做到的 5。唯一的办法是让消费者接收到消息后,再通过业务逻辑检查订单状态,决定是否执行(一种被动的“检查回退”模式)4。
3.1 架构前提与 5.3.2 版本发布
架构前提:取消功能在 4.x 时代是不可想象的,因为消息在 SCHEDULE_TOPIC_XXXX 中是匿名的,无法被精确定位 5。而 5.0 的 Chronos/RocksDB 架构 13 使得每一条定时消息都通过其 MessageId 变得可索引、可寻址。这为“取消”功能提供了必要的技术前提。
版本里程碑:Apache RocketMQ 5.3.2(2025年3月发布)的发行说明 (Release Notes) 中明确指出,该版本“adds support for the ability to cancel scheduled messages”(增加了取消定时消息的能力)20。
这是 RocketMQ 首次在开源版本中支持定时消息撤回。
3.2 "Recalling API" 的实现
这一功能在 5.x gRPC 客户端 (rocketmq-clients) 中被称为 "Recalling API"(召回 API)21。
根据 rocketmq-clients 仓库的提交记录,该功能的核心实现是 The java implementation of recalling API(Issue #878 的 Java 召回 API 实现),对应 Pull Request #879 22。
底层的 gRPC 接口增加了一个 RecallMessage RPC 调用 24。在 Java 客户端,这被封装为 Producer 接口上的一个新方法(如 recallMessage)。
3.3 Java 操作指南 (5.x gRPC SDK v5.3.2+)
要使用此功能,必须使用 5.3.2 或更高版本的 Broker,以及配套的 rocketmq-client-java (gRPC) SDK。
关键前提:必须在发送定时消息时,保存返回的 SendReceipt 对象,因为撤回操作需要 SendReceipt 中包含的 MessageId 和消息句柄(Handle)信息。
(注:鉴于该功能较新,以下是基于 5.x gRPC SDK 21 和 Recalling API 22 的 API 规范所构建的标准操作示例。)
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.message.Message;
import java.time.Duration;
import java.time.Instant;
public class CancelScheduledMessage532 {
public static void main(String args) throws Exception {
String endpoint = "localhost:8081";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints(endpoint)
.build();
// 1. 实例化 5.x gRPC 生产者 (版本需支持 5.3.2+)
Producer producer = provider.newBuilder(config).build();
// 2. 构建一个 10 分钟后投递的定时消息
long deliverTimestamp = Instant.now().plus(Duration.ofMinutes(10)).toEpochMilli();
Message message = provider.newMessageBuilder()
.setTopic("TestTopic")
.setDeliveryTimestamp(deliverTimestamp)
.setBody("This message might be cancelled".getBytes())
.build();
// 3. 发送定时消息,并【必须】保存 SendReceipt
SendReceipt sendReceipt = producer.send(message);
System.out.println("定时消息已发送, MsgID: " + sendReceipt.getMessageId());
//... 假设在 10 分钟内,业务逻辑发生变化 (例如订单已支付)
//... 现在需要取消该定时消息
try {
// 4. 【核心】调用 recallMessage 方法
// 该方法是 5.3.2+ 版本 gRPC SDK 的一部分
// 它需要原始的 SendReceipt 来定位消息
producer.recallMessage(sendReceipt);
System.out.println("定时消息已成功撤回, MsgID: " + sendReceipt.getMessageId());
} catch (Exception e) {
// 撤回可能会失败,例如:
// 1. 消息已经被投递(已过定时时间)
// 2. 消息句柄无效或网络问题
System.err.println("撤回消息失败: " + e.getMessage());
}
// 5. 关闭生产者
producer.close();
}
}
Java(注:某些云厂商(如华为云 2)在 5.3.2 之前提供了私有的取消方式,即发送一条带有 __CANCEL_SCHEDULED_MSG 属性的“命令消息”2。这与 Apache RocketMQ 5.3.2 的原生 recallMessage RPC 属不同实现,不应混淆。)
总结:RocketMQ 定时消息的演进矩阵
本报告分析了 RocketMQ 定时消息从 4.x 到 5.3.2 的演进脉络。其发展路径清晰地反映了 RocketMQ 从一个简单的消息总线向一个功能完备的、企业级的云原生事件平台的战略转型。
表2:RocketMQ 定时消息功能演进矩阵
| 功能特性 | 关键版本 | 核心 SDK (Java) | 关键 API 方法 | 最大延时 (默认) | 是否可取消 |
| 固定延迟等级 | 4.x (如 4.9.x) 1 | org.apache.rocketmq:rocketmq-client | message.setDelayTimeLevel(int level) 1 | 2 小时 (Level 18) 1 | 否 5 |
| 任意时间戳定时 | 5.0.0 9 | org.apache.rocketmq:rocketmq-client-java (gRPC) | messageBuilder.setDeliveryTimestamp(long ts) [17] | 24 小时 11 | 否 (5.3.2 之前) |
| 定时消息撤回 | 5.3.2 20 | org.apache.rocketmq:rocketmq-client-java (gRPC) | producer.recallMessage(SendReceipt receipt) 22 | 24 小时 | 是 |
架构师建议
- 新项目选型:对于所有新启动的项目,强烈建议直接使用 Apache RocketMQ 5.3.2 或更高版本的 Broker,并配合使用 5.x gRPC SDK (rocketmq-client-java)。这不仅提供了任意时间精度的定时能力,也提供了关键的“撤回”能力,使业务Saga可以形成完整的闭环。
- 4.x 迁移评估:对于仍在 4.x 上运行的存量系统,迁移至 5.x 并非简单的版本升级。它涉及 gRPC Proxy 的部署 16、客户端 SDK 的完全替换(从 Remoting 到 gRPC 12)以及所有定时消息逻辑的重写。架构师应评估迁移的必要性,仅当业务强依赖以下三个特性时,才值得投入迁移成本:
- 需要超过2小时的延时。
- 需要固定等级(如 10s, 30s, 1m...)之外的任意时间精度。
- 需要对已发送的定时消息执行“取消”操作。
引用的著作
- Delayed Message Sending - Apache RocketMQ, 访问时间为 十一月, 2025, https://rocketmq.apache.org/docs/4.x/producer/04message3/
- Delivering Scheduled Messages_Java (TCP)_Developer Guide_Distributed Message Service for RocketMQ-Huawei Cloud, 访问时间为 十一月, 2025, https://support.huaweicloud.com/eu/devg-hrm/hrm-devg-009.html
- Schedule example | Study, 访问时间为 十一月, 2025, https://linqiankun.github.io/hexoblog/yuque/rocketMq/Schedule%20example/
- Delivering Scheduled Messages - Cloud Service Help Center - Huawei, 访问时间为 十一月, 2025, https://doc.hcs.huawei.com/devg/dms/hrm-devg-009.html
- How to deal with rocketmq message accumulation and message delay #3095 - GitHub, 访问时间为 十一月 2, 2025, https://github.com/apache/rocketmq/issues/3095
- Apache RocketMQ 5.0: Evolution of a messaging, event, and stream converged high availability architecture - ApacheCon Asia 2022 - Powered by ALC-Beijing, 访问时间为 十一月, 2025, https://apachecon.com/acasia2022/sessions/messaging-1052.html
- Why choose RocketMQ, 访问时间为 十一月, 2025, https://rocketmq.apache.org/docs/
- RocketMQ · 官方网站 | RocketMQ - The Apache Software Foundation, 访问时间为 十一月, 2025, https://rocketmq.apache.org/
- Release Notes - Apache RocketMQ - Version 5.0.0, 访问时间为 十一月, 2025, https://rocketmq.apache.org/release-notes/2022/09/09/5.0.0/
- RocketMQ Improvement Proposal - GitHub, 访问时间为 十一月, 2025, https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal
- Delay Message - Apache RocketMQ, 访问时间为 十一月, 2025, https://rocketmq.apache.org/docs/featureBehavior/02delaymessage/
- Java Client SDK - Apache RocketMQ, 访问时间为 十一月, 2025, https://rocketmq.apache.org/docs/sdk/02java/
- Common Implementation Schemes of Delayed Messages - Ric.Studio 进击的程序员笔记, 访问时间为 十一月 2, 2025, https://segmentfault.com/a/1190000041282378/en
- What Is DMS for RocketMQ? - Distributed Message Service (DMS) 2.3.0.1 Usage Guide (for Huawei Cloud Stack 8.5.1) 01, 访问时间为 十一月, 2025, https://info.support.huawei.com/enterprise/en/doc/EDOC1100450007/85505261/what-is-dms-for-rocketmq
- ApsaraMQ for RocketMQ:Step 3: Use an SDK to send and receive messages - Alibaba Cloud, 访问时间为 十一月, 2025, https://www.alibabacloud.com/help/en/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/getting-started/step-3-use-sdks-to-send-and-receive-messages
- Run RocketMQ locally, 访问时间为 十一月, 2025, https://rocketmq.apache.org/docs/quickStart/01quickstart/
- ApsaraMQ for RocketMQ:Scheduled and delayed messages - Alibaba Cloud, 访问时间为 十一月, 2025, https://www.alibabacloud.com/help/en/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/scheduled-and-delayed-messages
- Scheduled Message and Delayed Message - Tencent Cloud, 访问时间为 十一月, 2025, https://www.tencentcloud.com/document/product/1113/58502
- how to delete delayqueue message #1583 - apache/rocketmq - GitHub, 访问时间为 十一月 2, 2025, https://github.com/apache/rocketmq/issues/1583
- Release Notes - Apache RocketMQ - Version 5.3.2, 访问时间为 十一月, 2025, https://rocketmq.apache.org/release-notes/2025/03/08/5.3.2/
- Collection of Client Bindings for Apache RocketMQ - GitHub, 访问时间为 十一月, 2025, https://github.com/apache/rocketmq-clients
- Releases · apache/rocketmq-clients - GitHub, 访问时间为 十一月, 2025, https://github.com/apache/rocketmq-clients/releases
- [Feature] Implement Recalling API in Java SDK · Issue #878 · apache/rocketmq-clients, 访问时间为 十一月, 2025, https://github.com/apache/rocketmq-clients/issues/878
- Diff - d2f95889b4852251ba5c526ef790ea8735b42100^! - rocketmq, 访问时间为 十一月, 2025, https://apache.googlesource.com/rocketmq-clients/+/d2f95889b4852251ba5c526ef790ea8735b42100%5E%21/
- Release Notes - Apache RocketMQ - Version 5.1.1, 访问时间为 十一月, 2025, https://rocketmq.apache.org/release-notes/2023/05/15/5.1.1/
Comments NOTHING