RocketMQ 顺序消息:消息有序是指可以按照消息发送顺序来消费。RocketMQ 可以严格的保证消息有序,但是这个顺序逼格不是全局顺序,只是分区(queue)顺序。要保证群居顺序,只能有一个分区。
顺序消息
在 MQ 模型中,顺序要由三个阶段保证:
- 消息被发送时,保持顺序
- 消息被存储时的顺序和发送的顺序一致
- 消息被消费时的顺序和存储的顺序一致
发送时保持顺序,意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致,则要求在同一线程中被发送出来的消息 A/B,存储时 A 要在 B 之前。而消费保持和存储一致,则要求消息 A/B 到达 Consumer 之后必须按照先后顺序被处理。
生产者
1 | package com.laiyy.study.rocketmqprovider.order; |
控制台输出结果:
1 | 发送第:1 条信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE66560000, offsetMsgId=C0A834C800002A9F00000000000000B8, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=1] |
可以看到,所有消息的 queueId
都为 0,顺序消息生产成功。
消费者
1 | public class OrderConsumer { |
顺序消费者与之前的 demo 最大的不同,在于 message listener
从 MessageListenerConcurrently
变为 MessageListenerOrderly
,消费标识从 ConsumeConcurrentlyStatus
变为 ConsumeOrderlyStatus
。
查看控制台输出:
1 | Consumer 消费信息:topic:TOPIC_DEMO,tags:TAG_A,消息体:HELLO!1 |
事务消息
在 RocketMQ 4.3 版本后,开放了事务消息。
RocketMQ 事务消息流程
RocketMQ 的事务消息,只要是通过消息的异步处理,可以保证本地事务和消息发送同事成功执行或失败,从而保证数据的最终一致性。
MQ 事务消息解决分布式事务问题,但是第三方 MQ 支持事务消息的中间件不多,如 RockctMQ,它们支持事务的方式也是类似于采用二阶段提交,但是市面上一些主流的 MQ 都是不支持事务消息的,如:Kafka、RabbitMQ
以 RocketMQ 为例,事务消息实现思路大致为:
- 第一阶段的 Prepared 消息,会拿到消息的地址
- 第二阶段执行本地事务
- 第三阶段通过第一阶段拿到的地址去访问消息,并修改状态
也就是说,在业务方法内想要消息队列提交两次消息,一次发送消息和一次确认消息。如果确认消息发送失败,RocketMQ 会定期扫描消息集群中的事务消息。这时候发现了 prepared 消息,它会向消息发送者确认,所以生产方需要实现一个 check 接口。RocketMQ 会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
事务消息的成功投递需要三个 Topic,分别是
- Half Topic:用于记录所有的 prepare 消息
- Op Half Topic:记录以及提交了状态的 prepare 消息
- Real Topic:事务消息真正的 topic,在 commit 后才会将消息写入该 topic,从而进行消息投递。
事务消息实现
1 | public class TransactionProducer { |
事务消息监听器:
1 | public class TransactionListenerImpl implements TransactionListener { |
运行生产者,查看控制台输出:
1 | 正在执行本地事务。。。。 |
需要注意:消息回查会隔一段时间执行一次,如果执行本地事务的时间太短,则控制台不会输出事务回查日志。
广播消息
生产者
1 | public class Producer { |
消费者
消费者需要将消费模式修改为 广播消费: consumer.setMessageModel(MessageModel.BROADCASTING);
1 | public class Consumer { |
验证
生产者控制台输出
1 | SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965570000, offsetMsgId=C0A834C800002A9F00000000000026D0, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=1], queueOffset=0] |
消费者控制台输出
1 | A Consumer 消费信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息体:HELLO!1 |
1 | B Consumer 消费信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息体:HELLO!1 |