seata 集成消息队列 RocketMQ

gomkiri 发布于 2025-11-10 50 次阅读


AI 摘要

SEATA集成RocketMQ官方方案不符合SpringBoot使用习惯?教你用配置类实现自动装配,告别重复创建SeataMQProducer,让分布式事务管理更优雅!

在阅读 SEATE 的官方文档时,官方给出的 seata 集成 RocketMQ 的方式是这样的:

public class BusinessServiceImpl implements BusinessService {
    private static final String NAME_SERVER = "127.0.0.1:9876";
    private static final String PRODUCER_GROUP = "test-group";
    private static final String TOPIC = "test-topic";
    private static SeataMQProducer producer= SeataMQProducerFactory.createSingle(NAME_SERVER, PRODUCER_GROUP);

    public void purchase(String userId, String commodityCode, int orderCount) {
      producer.send(new Message(TOPIC, "testMessage".getBytes(StandardCharsets.UTF_8)));
      //do something
    }
}
Java

但是这种方式明显是不符合 SpringBoot 的使用直觉的,因为一般情况下SeataMQProducer作为单例使用就可以了,不需要重复进行创建,但是当我查看了 seata 的配置列表,他确定没有为我们提供自动装配的配置方案,我想这么做应该是为了保证 seata 对 RocketMQ 的普适性和 seata 的轻量化(也可能是懒得做吧),但是这并不代表没有更加便捷的方式了。我们完全可以使用 配置 Config 类的形式完成自动装配:

前提:已经导入了正确的 seata 和 RocketMQ 的依赖,并完成了基本的配置,例如:

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: test-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2

seata:
  enabled: true
  application-id: your-application
  tx-service-group: my_tx_group
  service:
    vgroup-mapping:
      my_tx_group: default
    grouplist:
      default: 127.0.0.1:8091
YAML

然后就可以创建一个SeataRocketMQConfig

@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
public class SeataRocketMQConfig {

    @Autowired
    private RocketMQProperties rocketMQProperties;

    @Bean
    public SeataMQProducer seataMQProducer() {
        RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
        return SeataMQProducerFactory.createSingle(
            rocketMQProperties.getNameServer(),
            producerConfig.getGroup()
        );
    }

    @Bean
    @ConditionalOnMissingBean
    public RocketMQTemplate rocketMQTemplate(SeataMQProducer seataMQProducer) {
        // 如果需要,可以创建集成了Seata的RocketMQTemplate
        return new RocketMQTemplate();
    }
}
Java

这样我们就可以通过 rocketMQProperties 的信息(也就是配置文件中的 NameServer 和 Group)来完成 SeateMQproducer 的单例 Bean 的创建。在服务中我们只需要通过@Autowired private SeataMQProducer seataMQProducer;等方式将其注入即可。

注意:使用 seataMQProducer 的方法上面别忘了加开启全局事务的 @GlobalTransactional注解