RocketMQ - 如何实现顺序消息

RocketMQ - 如何实现顺序消息,第1张

顺序消息的使用场景

日常项目中需要保证顺序的应用场景非常多,比如交易场景中的订单创建、支付、退款等流程,先创建订单才能支付,支付完成的订单才能退款,这需要保证先进先出。又例如数据库的BinLog消息,数据库执行新增语句、修改语句,BinLog消息得到顺序也必须保证是新增消息、修改消息。

如何发送和消费顺序消息

我们使用RocketMQ顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费

1. 顺序发消息

上面代码模拟了按顺序依次发送创建、支付、退款消息到TopicTest中。在application.properties配置文件中指定producer.sync=true,默认是异步发送,此处改为同步发送。

MessageBuilder设置Header信息头,表示这是一条顺序消息,将消息固定地发送到第0个消息队列。

2. 顺序收消息

程序运行后,可以在控制台看到日志输出,也是按照顺序打印出来的

顺序发送的技术原理

RocketMQ的顺序消息分为2种情况:局部有序和全局有序。前面的例子是局部有序场景。

RocketMQ中消息发送有三种方式:同步、异步、单项。

顺序消息发送的原理比较简单,同一类消息发送到相同的队列即可。为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式,否则可能出现先发送的消息后到消息队列中,此时消息就乱序了。

RocketMQ的核心代码如下:

选择队列的过程由messageQueueSelector和hashKey在实现类SelectMessageQueueByHash中完成

在队列列表的获取过程中,由Producer从NameServer根据Topic查询Broker列表,缓存在本地内存中,以便下次从缓存中读取。

普通发送的技术原理

RocketMQ中除了顺序消息外,还支持事务消息和延迟消息,非这三种特殊的消息称为普通消息。日常开发中最常用的是普通消息,这是因为最常用的场景就是系统间的异步解耦和流量的削峰填谷,这些场景下尽量保证消息高性能收发即可。

从普通消息与顺序消息的对比来看,普通消息在发送时选择消息队列的策略不同。普通消息发送选择队列有两种机制:轮询机制和故障规避机制。默认使用轮询机制,一个Topic有多个队列,轮询选择其中一个队列。

轮询机制的原理是路由信息TopicPublishInfo中维护了一个计数器sendWhichQueue,每发送一次消息需要查询一次路由,计算器就进行“+1”,通过计数器的值index与队列的数量取模计算来实现轮询算法。

轮询算法简单好用,但是有个弊端,如果轮询选择的队列是在宕机的Broker上,会导致消息发送失败,即使消息发送重试的时候重新选择队列,也可能还是在宕机的Broker上,无法规避发送失败的情况,因此就有了故障规避机制。

顺序消费的技术原理

RocketMQ支持两种消费模式:集群消费和广播消费。两者的区别是,在广播消费模式下每条消息会被ConsumerGroup的每个Consumer消费,在集群消费模式下每条消息只会被ConsumerGroup的一个Consumer消费。

多数场景都使用集群消费,消息每次消费代表一次业务处理,集群消费表示每条消息由业务应用集群中任意一个服务实例来处理。少数场景使用广播消费,例如数据发生变化,更新业务应用集群中每个服务的本地缓存,这就需要一条消息被整个集群都消费一次,默认是集群消费。

顺序消费也叫做有序消费,原理是同一个消息队列只允许Consumer中的一个消费线程拉取消费,Consumer中有个消费线程池,多个线程会同时消费消息。在顺序消费的场景下消费线程请求到Broker时会先申请独占锁,获得锁的请求则允许消费。

消息消费成功后,会向Broker提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息,顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前这条消息,并不会继续消费该队列中的后续消息,从而保证顺序消费。

在顺序消费的场景下,特别需要注意对异常的处理,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息堆积。

并发消费的原理

RocketMQ支持两种消费方式:顺序消费和并发消费。并发消费是默认的消费方式,日常开发过程中最常用的方式,除了顺序消费就是并发消费。

并发消费也称为乱序消费,其原理是同一个消息队列提供给Consumer中的多个消费线程拉取消费。Consumer中会维护一个消费线程池,多个消费线程可以并发去同一个消息队列中拉取消息进行消费。如果某个消费线程在监听器中进行业务处理时抛出异常,当前线程会进行重试,不影响其它消费线程和消费队列的消费进度,消费成功的线程正常提交消费进度。

并发消费相比于顺序消费没有资源争抢上锁的过程,消费消息的速度比顺序消费要快很多。

消息的幂等性

说到消息消费不得不提到消息的幂等性,业务代码中通常收到一条消息进行一次业务逻辑处理,如果一条相同的消息被重复收到几次,是否会导致业务重复处理?Consumer能够不重复接收消息?

RocketMQ不保证消息不被重复消费,如果业务对消息重复消费非常敏感,必须要在业务层面进行幂等性处理,具体实现可以通过分布式锁来完成。

在所有消息系统中消费消息有三种模式:at-most-once(最多一次)、at-least-once(最少一次)和exactly-only-once(精确仅一次),分布式消息系统都是在三者间取平衡,前两者是可行的并且被广泛使用。

ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?

另外,RocketMQ还支持按照客户端IP进行白名单设置。

在讲解如何使用ACL之前,我们先简单看一下RocketMQ ACL的请求流程:

对于上述具体的实现,将在后续文章中重点讲解,本文的目的只是希望给读者一个大概的了解。

acl默认的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目录下。下面对其配置项一一介绍。

全局白名单,其类型为数组,即支持多个配置。其支持的配置格式如下:

配置用户信息,该类型为数组类型。拥有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。

登录用户名,长度必须大于6个字符。

登录密码。长度必须大于6个字符。

用户级别的IP地址白名单。其类型为一个字符串,其配置规则与globalWhiteRemoteAddresses,但只能配置一条规则。

boolean类型,设置是否是admin。如下权限只有admin=true时才有权限执行。

默认topic权限。该值默认为DENY(拒绝)。

默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。

设置topic的权限。其类型为数组,其可选择值在下节介绍。

设置消费组的权限。其类型为数组,其可选择值在下节介绍。可以为每一消费组配置不一样的权限。

上面定义了全局白名单、用户级别的白名单,用户级别的权限,为了更好的配置ACL权限规则,下面给出权限匹配逻辑。

首先,需要在broker.conf文件中,增加参数aclEnable=true。并拷贝distribution/conf/plain_acl.yml文件到${ROCKETMQ_HOME}/conf目录。

broker.conf的配置文件如下:

plain_acl.yml文件内容如下:

从上面的配置可知,用户RocketMQ只能发送TopicTest的消息,其他topic无权限发送;拒绝oms_consumer_group消费组的消息消费,其他消费组默认可消费。

运行效果如图所示:

发现并不没有消费消息,符合预期。

关于RocketMQ ACL的使用就介绍到这里了,下一篇将介绍RocketMQ ACL实现原理。

推荐阅读:

1、 RocketMQ实战:生产环境中,autoCreateTopicEnable为什么不能设置为true

2、 RocketMQ 消息发送system busy、broker busy原因分析与解决方案

3、 RocketMQ HA机制(主从同步)

4、 RocketMQ事务消息实战


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/bake/7916231.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-11
下一篇2023-04-11

发表评论

登录后才能评论

评论列表(0条)

    保存