RocketMQ的事务消息

RocketMQ的事务消息,第1张

RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账,A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银行账户扣除一万元”这个 *** 作同时成功或者同时失败。

RocketMQ采用两阶段提交的方式实现事务消息,TransactionMQProducer处理上面情况的流程是,先发一个“准备从B银行账户增加一万元”的消息,发送成功后做从A银行账户扣除一万元的 *** 作,根据 *** 作结果是否成功,确定之前的“准备从B银行账户增加一万元”的消息是做commit还是rollback,具体流程如下:

1)发送方向RocketMQ发送“待确认”消息。

2)RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成。

3)发送方开始执行本地事件逻辑。

4)发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。

5)如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起回查请求。

6)发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit或Roolback状态。

7)RocketMQ收到回查请求后,按照步骤4)的逻辑处理。

上面的逻辑似乎很好地实现了事务消息功能,它也是RocketMQ之前的版本实现事务消息的逻辑。

但是因为RocketMQ依赖将数据顺序写到磁盘这个特征来提高性能,步骤4)却需要更改第一阶段消息的状态,这样会造成磁盘Catch的脏页过多,降低系统的性能。所以RocketMQ在4.x的版本中将这部分功能去除。系统中的一些上层Class都还在,用户可以根据实际需求实现自己的事务功能。

客户端有三个类来支持用户实现事务消息,

第一个类是LocalTransaction-Executer,用来实例化步骤3)的逻辑,根据情况返回LocalTransactionState.ROLLBACK_MESSAGE或者

LocalTransactionState.COMMIT_MESSAGE状态。

第二个类是TransactionMQProducer,它的用法和DefaultMQProducer类似,要通过它启动一个Producer并发消息,但是比DefaultMQProducer多设置本地事务处理函数和回查状态函数。

第三个类是TransactionCheckListener,实现步骤5)中MQ服务器的回查请求,返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1.事务消息发送及提交:

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit *** 作生成消息索引,消息对消费者可见)。

2.补偿流程:

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”。

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态。

(3) 根据本地事务状态,重新Commit或者Rollback。

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息。然后二阶段会显示执行提交或者回滚half消息(逻辑删除)。当然,为了防止二阶段 *** 作失败,RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

在RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通 过ConsumeQueue这个二级索引来读取消息实体内容,其流程如下:

RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。RMQ_SYS_TRANS_HALF_TOPIC

在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit *** 作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个 *** 作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op *** 作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—

TransactionalMessageUtil.buildOpTopic();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查 *** 作。

在执行二阶段Commit *** 作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入 *** 作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit *** 作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。

Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。

值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。

TxConsumer类实现

大家好,我是BugKing,不知不觉正式工作快满2年了,在工作之前没有用过消息队列中间件,在这想分享下我这两年对RocketMQ的使用以及一些思考,因为内容比较多,会用好几期来分享。

先简单来聊下我在日常开发中,哪些问题适合使用RocketMQ来解决,因为我是搞IM的,所以下面我都会以IM的角度来分享。

在我负责的IM系统中,经常会遇到业务方群发几十万消息的场景,那面临这么多的请求,如何避免请求压垮我们的IM聊天系统呢?我们的系统应该是在自身能力范围内尽可能多地处理请求,那我们就可以使用消息队列来达到流量控制和保护后端服务的目的。

加入RocketMQ后,整个业务方发送消息的流程变成:

1、业务方调用rpc框架如dubbo接口发送消息后,直接将消息内容放入RocketMQ;

2、发消息后端服务从RocketMQ中获取消息内容,完成后续发消息流量,投递给前端。

这种设计既有优点也有缺点

那想在同一个topic下的某种消息进行流量控制限速呢?有没有什么好的办法?

我的做法是根据某种类型消息的标识,通过令牌桶算法(单机限流),根据你预估的处理能力,为这种消息单独设置一个线程池,线程池队列长度可以设置大些,用这个线程池也单独处理这种消息,这样也不会让其他类型的消息堆积在MQ。

IM系统也需要解决的核心问题时,如何利用有限的服务器资源,尽可能多地处理大量发送消息。在一个正常的IM系统中,一个完整的消息发送包含了很多 *** 作,当你发出去一条消息后可会有这些 *** 作:

1、消息入库

2、消息投递前端

3、用户不在线需要发送离线push。

4、用户这条消息被风控了需要发送风控提示。

5、消息需要统计数据,包含每天发送量,push量等等。

6、....

如果没有任何优化,正常的处理流程时:消息投递后,依次调用上述流程,然后结束。

对于这几个步骤来说,决定消息是否发送成功,实际上只有消息入库这个步骤,只要消息入库了,用户就一定能看到消息,就算当时没有投递给前端,后续用户拉历史消息也能把消息拉出来,但是为了判断用户在不在线,需不需要发离线push,依赖消息投递前端的结果,所以当消息入库、消息投递前端后,就可以马上结束流程,然后把消息体放入rokcetMQ中,由消息队列异步执行后续的 *** 作。

rocketMQ的另一个作用,就是实现系统之间的解耦。

我们知道订单时在电商系统中比较核心的,当有一个新订单时:

1、支付系统发起支付流程

2、风控需要审核

3、IM系统发送一些卡片消息(比如确认收货地址)

4、统计系统需要统计数据

5、.....

这些订单下油的系统都需要实时获得订单数据。随着业务的发展,订单的下游可能在不断增加,负责订单的程序员不得不花费大量的精力,应对不断变化的下游系统,不停地调试订单系统与下游系统的接口。任何一个接口变更,订单系统就需要修改并上线,这是不能接受的。几乎所有的电商都会选择消息队列来解决类型的系统耦合的问题。这时候引入rocketMQ憨,订单系统在有一个新订单时,发送一条消息到rocketMQ的topic中,所有下游系统都订阅topic,这样每个下游可以根据订单消息来做相应的处理。

RocketMQ用的消息模式时发布 - 订阅模型。在发布 - 订阅模型中,消息的发送方称为发布者,接收方称为订阅者,服务端存放的消息的容器称为主题(Topic)。传统的队列模式和这种模型最大的区别就是,一份消息数据能不能被消费多次对的问题。因为在传统的队列模型中,任何一条消息都只能被一个消费者收到。

RocketMQ是发布-订阅模型,但是RocketMQ也有队列的概念,那队列的作用是什么呢?

我们都知道RocketMQ中有ack机制,确保消息不会在传递过程中由于网络或服务器故障而丢失,在消费端如果收到消息并完成了业务逻辑后,会给MQ回一个消费成功的确认,代表一条消息被成功消费,否则会给消费者重新发送消息,直到成功ack。这个确认机制保证了消息传递的可靠性,但是也带来了一个问题,为了确保消息的有序性,在某一笑消息被成功消息前,下一条消息是不能被消费的,否则违背了有序性这个原则,也就是每个Topic在任意时刻,最多只能有一个消费者在进行消费,这样消费端总体的消费性能就不能通过水平扩展消费者数量来提升,所以RocketMQ引入了队列来解决这个问题。来看下面这个图:

RocketMQ的每个Topic都包含多个队列,通过多个队列来实现多实例并行生产和消费。rocketMQ只在队列上保证消息的有序性,Topic层面是无法保证消息严格顺序的。每个消费组都有主题中一份完整的消息,不同消费组之间消费进度不受对方影响,

一条消息被消费组1消费过,也会给消费组2消费。

每一个消费组中包含多个消费者,同一个消费组内的消费者是竞争关系,比如一个消费组内的一条消息被消费者1消费了,就不会再给同组的其他消费者消费。

在一个Topic下的消息消费过程中,消息需要被不同的组进行多次消费,所以每个消费组在每个队列都维护一个消费位置,在这个位置之前的消息都是被消费过的,之后的消息都是没有被消费过。

** 需要注意的是Topic和消费组的关系、消费组和消费者的关系,消费组和队列数没有关系,不是有多少消费者就有多少队列,队列数可以根据数据量和消费速度合理配置**

可以按照某个唯一标识,比如IM中,根据消息发送方用户id,通过一致性哈希算法,计算出队列ID,指定队列ID发送,这样可以保证相同的用户发的消息总被发送到同一个队列上,可以确保严格顺序。

时间不早了~下期再见。

顺序消息的使用场景

日常项目中需要保证顺序的应用场景非常多,比如交易场景中的订单创建、支付、退款等流程,先创建订单才能支付,支付完成的订单才能退款,这需要保证先进先出。又例如数据库的BinLog消息,数据库执行新增语句、修改语句,BinLog消息得到顺序也必须保证是新增消息、修改消息。

如何发送和消费顺序消息

我们使用RocketMQ顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费。

1. 顺序发消息

上面代码模拟了按顺序依次发送创建、支付、退款消息到TopicTest中。在application.properties配置文件中指定producer.sync=true,默认是异步发送,此处改为同步发送。

MessageBuilder设置Header信息头,表示这是一条顺序消息,将消息固定地发送到第0个消息队列。

2. 顺序收消息

程序运行后,可以在控制台看到日志输出,也是按照顺序打印出来的

顺序发送的技术原理

RocketMQ的顺序消息分为2种情况:局部有序和全局有序。前面的例子是局部有序场景。

RocketMQ中消息发送有三种方式:同步、异步、单项。

顺序消息发送的原理比较简单,同一类消息发送到相同的队列即可。为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式,否则可能出现先发送的消息后到消息队列中,此时消息就乱序了。

RocketMQ的核心代码如下:

选择队列的过程由messageQueueSelector和hashKey在实现类SelectMessageQueueByHash中完成

在队列列表的获取过程中,由Producer从NameServer根据Topic查询Broker列表,缓存在本地内存中,以便下次从缓存中读取。

普通发送的技术原理

RocketMQ中除了顺序消息外,还支持事务消息和延迟消息,非这三种特殊的消息称为普通消息。日常开发中最常用的是普通消息,这是因为最常用的场景就是系统间的异步解耦和流量的削峰填谷,这些场景下尽量保证消息高性能收发即可。

从普通消息与顺序消息的对比来看,普通消息在发送时选择消息队列的策略不同。普通消息发送选择队列有两种机制:轮询机制和故障规避机制。默认使用轮询机制,一个Topic有多个队列,轮询选择其中一个队列。

轮询机制的原理是路由信息TopicPublishInfo中维护了一个计数器sendWhichQueue,每发送一次消息需要查询一次路由,计算器就进行“+1”,通过计数器的值index与队列的数量取模计算来实现轮询算法。

轮询算法简单好用,但是有个弊端,如果轮询选择的队列是在宕机的Broker上,会导致消息发送失败,即使消息发送重试的时候重新选择队列,也可能还是在宕机的Broker上,无法规避发送失败的情况,因此就有了故障规避机制。

顺序消费的技术原理

RocketMQ支持两种消费模式:集群消费和广播消费。两者的区别是,在广播消费模式下每条消息会被ConsumerGroup的每个Consumer消费,在集群消费模式下每条消息只会被ConsumerGroup的一个Consumer消费。

多数场景都使用集群消费,消息每次消费代表一次业务处理,集群消费表示每条消息由业务应用集群中任意一个服务实例来处理。少数场景使用广播消费,例如数据发生变化,更新业务应用集群中每个服务的本地缓存,这就需要一条消息被整个集群都消费一次,默认是集群消费。

顺序消费也叫做有序消费,原理是同一个消息队列只允许Consumer中的一个消费线程拉取消费,Consumer中有个消费线程池,多个线程会同时消费消息。在顺序消费的场景下消费线程请求到Broker时会先申请独占锁,获得锁的请求则允许消费。

消息消费成功后,会向Broker提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息,顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前这条消息,并不会继续消费该队列中的后续消息,从而保证顺序消费。

在顺序消费的场景下,特别需要注意对异常的处理,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息堆积。

并发消费的原理

RocketMQ支持两种消费方式:顺序消费和并发消费。并发消费是默认的消费方式,日常开发过程中最常用的方式,除了顺序消费就是并发消费。

并发消费也称为乱序消费,其原理是同一个消息队列提供给Consumer中的多个消费线程拉取消费。Consumer中会维护一个消费线程池,多个消费线程可以并发去同一个消息队列中拉取消息进行消费。如果某个消费线程在监听器中进行业务处理时抛出异常,当前线程会进行重试,不影响其它消费线程和消费队列的消费进度,消费成功的线程正常提交消费进度。

并发消费相比于顺序消费没有资源争抢上锁的过程,消费消息的速度比顺序消费要快很多。

消息的幂等性

说到消息消费不得不提到消息的幂等性,业务代码中通常收到一条消息进行一次业务逻辑处理,如果一条相同的消息被重复收到几次,是否会导致业务重复处理?Consumer能够不重复接收消息?

RocketMQ不保证消息不被重复消费,如果业务对消息重复消费非常敏感,必须要在业务层面进行幂等性处理,具体实现可以通过分布式锁来完成。

在所有消息系统中消费消息有三种模式:at-most-once(最多一次)、at-least-once(最少一次)和exactly-only-once(精确仅一次),分布式消息系统都是在三者间取平衡,前两者是可行的并且被广泛使用。


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/bake/7954714.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-11
下一篇2023-04-11

发表评论

登录后才能评论

评论列表(0条)

    保存