RocketMQ - 基于延迟消息机制优化大量订单的定时退款扫描问题

RocketMQ - 基于延迟消息机制优化大量订单的定时退款扫描问题,第1张

我们先考虑一个正常的电商购物流程,一般来说我们作为用户在一个点上APP上都会选择一些商品加入购物车,然后对购物车里选择的一些商品统一下一个订单,此时后台的订单系统必然会在订单数据库中创建一个订单。

但是我们下了一个订单之后,虽然订单数据库里会有一个订单,订单的状态却是"待支付"状态,因为此时你还没有支付这个订单,我们的订单系统其实也在等待订单用户完成这个订单的支付。

这里就有两种可能了,一种是用户下单之后立马就支付掉了,那么接着订单系统可以走后续的流程,比如通过MQ发送消息通知优惠券系统给用户发放优惠券等等。

另外一种可能是用户下单后,一直在犹豫,迟迟没有下订单。

因此在实际情况中吗,其实APP的大量用户每天会下很多订单,但是不少订单可能是一直没有进行支付的,可能他下了单之后犹豫了,可能是他忘了支付了。

所以一般订单系统都必须设置一个规则, 当一个订单下单之后,比如超过30分钟没有支付,那么就必须自动关闭这个订单,后续你如果要购买这个订单里的商品,就要重新下订单了。

可能你的订单系统就需要有一个后台线程,不停地扫描订单数据库里所有的未支付状态的订单,看他如果超过30分钟还没支付,就自动把订单状态改成“已关闭”。

但是这里就引入一个问题,就是订单系统的后台线程必须要不停地扫描各种未支付的订单,这种实现方式实际上并不是很好。

一个原因是未支付状态的订单可能是比较多,然后你需要不停地扫描他们, 可能每个未支付状态的订单要被扫描N多遍,才会发现他已经超过30分钟未支付了。

另外一个是很难去分布式并行扫描你的订单,因为假设你的订单数量特别多,然后你要是打算用多台机器部署订单扫描服务,但是每台机器扫描哪些订单?怎么扫描?什么时候扫描?这都是一系列麻烦的问题。

因此针对此类场景的问题,MQ里的延迟消息就会出场了,他是特别适合在这种场景里使用的,而且在实际项目中,MQ的延迟消息使用的往往是很多的。

所谓延迟消息,意思就是当我们订单系统在创建了一个订单之后,可以发送一条消息到MQ里去,我们指定这条消息是延迟消息,比如要等待30分钟,才能被订单扫描服务给消费到。

这样当订单扫描服务在30分钟后消费到了一条消息之后,就可以针对这条消息的信息,去订单数据库里查找这个订单,看看他的状态是否还是未支付状态。如果还是未支付,那么就可以关闭这个订单。

这种方式就比用后台线程扫描订单的方式要好得多了,一个是对每个订单你只会在创建30分钟之后查询他一次而已,不会反复扫描订单多次。

另外就是如果你的订单数量比较多,你完全可以让订单扫描服务多部署几台服务器,然后对于这个Topic可以多指定一些MessageQueue,这样每个订单扫描服务的机器可以作为一个consumer都会处理一部分订单的查询任务。

基于订单定时退款场景,来分析RocketMQ的延迟消息的代码实现

大家看到上面的代码,其实发送延迟消息的核心,就是设置消息的delayTimeLevel,也就是延迟级别,RocketMQ默认支持一些延迟级别如下:

所以上面的代码中设置延迟级别为3,也就是延迟10s,如果是订单延迟扫描场景,可以设置延迟级别为16,也就是对应上面的30分钟。

1、检查网络的网速是否正常,局域网里是否有人看**或者下载软件等拖慢了网速。2、连接苹果跟iTunes时,检查iT的设置情况,打开iTunes--编辑--偏好设置--家长控制--iTunesStore这一项勾选(把允许访问iTunesU这一项也勾选)--确

Jafka/Kafka

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

其他一些队列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就不再一一分析。

我也是个菜鸟,说下我的理解:IBM MQ是有“发送队列”、“接收队列”的。ActiveMQ没有“发送队列”、“接收队列”,就是普通的建一个“队列”,然后发消息到这个队列,从这个队列接收消息。

前置文章:

RabbitMQ-消息可靠性&延迟消息

一、MQ常见问题

二、消息堆积-惰性队列

1、消息堆积问题

2、解决消息堆积方法

3、惰性队列

三、高可用-MQ集群

1、集群分类

2、普通集群

3、镜像集群

4、冲裁队列

确保发送的消息至少被消费一次;

实现消息的延迟投递;

处理消息无法及时消费的问题;

避免单点MQ故障导致整体不可用;

1、消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会 成为死信 ,会 被丢弃 ,这就是消息堆积问题。

2、解决消息堆积方法

3、惰性队列

从RabbitMQ的360版本开始,就增加了Lazy Queues的概念,也就是惰性队列。

Ⅰ 接收到消息后直接 存入磁盘 而非内存;

Ⅱ 消费者要消费消息时才会 从磁盘中读取 并加载到内存;

Ⅲ 支持 数百万条 的消息存储。

要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。

Ⅰ 可以通过命令行将一个运行中的队列修改为惰性队列,如下:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

Ⅱ 用SpringAMQP声明惰性队列,如下:

@Bean注解的形式,如下:

@RabbitListener注解的形式,如下:

Ⅰ 优点

基于磁盘存储,消息上限高;

没有间歇性的page-out,性能比较稳定;

Ⅱ 缺点

基于磁盘存储,消息时效性会降低;

性能受限于磁盘的IO。

官方文档: Clustering Guide — RabbitMQ 。

1、集群分类

是一种 分布式集群 ,将队列分散到集群的各个节点,从而提高整个集群的并发能力。

是一种 主从集群 ,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

注意:镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。

在RabbitMQ的38版本以后推出的,底层采用Raft协议确保主从的数据一致性。

2、普通集群

Ⅰ 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息;

Ⅱ 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回;

Ⅲ 队列所在节点宕机,队列中的消息就会丢失。

Ⅰ 获取Cookie

RabbitMQ底层依赖于Erlang,而Erlang虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个RabbitMQ 节点使用 cookie 来确定它们是否被允许相互通信。

要使两个节点能够通信,它们必须具有相同的共享秘密,称为 Erlang cookie 。cookie 只是一串最多 255 个字符的字母数字字符。

每个集群节点必须具有 相同的 cookie 。实例之间也需要它来相互通信。

首先获取Cookie,指令如下:

其中 YYNCLCJEKVNUFYQFPNZH 这一串就是生成的Cookie,如下:

Ⅱ 删除现有mq容器

Ⅲ 准备rabbitmqconf配置文件

此处选择在tmp目录下创建,如下:

配置文件内容如下:

Ⅳ 准备Cookie记录文件

Ⅴ 准备集群目录

Ⅵ 拷贝配置文件、Cookie文件到目录

echo :用于字符串的输出,输出字符串到 | 后面;

-t :表示先打印命令,再执行;

-n 1 :表示执行命令时用的args个数为1个。

Ⅶ 创建集群网络

Ⅷ 运行容器

集群中的节点标示默认都是: rabbit@[hostname] 。

Ⅰ 往rabbit@mq1添加队列

在mq2、mq3中也可以查看到该队列,因为元信息共享。

Ⅱ 往simplequeue添加数据

在mq2、mq3中可以查看到消息,如下:

Ⅲ 让mq1宕机

mq2、mq3无法读取到数据,因为只共享元信息,没有同步备份数据,如下:

3、镜像集群

镜像集群官方文档: Classic Queue Mirroring — RabbitMQ 。

普通集群不具备高可用的特性,使用镜像集群可以解决这个问题。

Ⅰ 镜像队列结构是一主多从(从就是镜像);

Ⅱ 所有 *** 作都是主节点完成,然后同步给镜像节点;

Ⅲ 主宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失);

Ⅳ 不具备负载均衡功能,因为所有 *** 作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)。

Ⅰ 设置exactly模式

Ⅱ 创建队列

Ⅲ 发送消息

Ⅳ 让mq1宕机

注意:mq1恢复后,该队列的主节点仍然为mq3。

4、冲裁队列

Ⅰ 与镜像队列一样,都是主从模式,支持主从数据同步;

Ⅱ 使用非常简单,没有复杂的配置;

Ⅲ 主从同步基于Raft协议,强一致。

注意:仲裁队列是38版本以后才有的新功能。

+2表示有2个镜像节点,仲裁队列默认镜像数为5,集群节点不足5则都是镜像。

@Bean注解配置

修改配置文件

以上即为RabbitMQ-消息堆积&高可用的全部内容,感谢阅读。

1,为什么要用mq?

2,引入mq会多哪些问题?

3,如何解决这些问题?

---

一:传统模式有哪些痛点

(1)有些复杂的业务系统,一次用户请求可能会同步调用N个系统的接口,需要等待所有的接口都返回了,才能真正的获取执行结果。这种同步接口调用的方式总耗时比较长,非常影响用户的体验,特别是在网络不稳定的情况下,极容易出现接口超时问题。

(2)系统之间耦合性太高,如果调用的任何一个子系统出现异常,整个请求都会异常,对系统的稳定性非常不利。

(3)对于类似于秒杀场景的峰值爆炸的场景,系统的稳定性堪忧;

二:为什么要用mq?

(1)异步:同步接口调用导致响应时间长的问题,使用mq之后,将同步调用改成异步,能够显著减少系统响应时间。避免耗时时间长,影响用户体验的事情发生

(2)解耦:子系统间耦合性太大的问题,使用mq之后,我们只需要依赖于mq,避免了各个子系统间的强依赖问题。这样就把之前复杂的业务子系统的依赖关系,转换为只依赖于mq的简单依赖,从而显著的降低了系统间的耦合度。

(3)削峰:由于突然出现的请求峰值,导致系统不稳定的问题。使用mq后,能够起到消峰的作用。

订单系统接收到用户请求之后,将请求直接发送到mq,然后订单消费者从mq中消费消息,做写库 *** 作。如果出现请求峰值的情况,由于消费者的消费能力有限,会按照自己的节奏来消费消息,多的请求不处理,保留在mq的队列中,不会对系统的稳定性造成影响。

三:引入mq会出现哪些问题:

1,重复消息问题

重复消费问题可以说是mq中普遍存在的问题,不管你用哪种mq都无法避免。

有哪些场景会出现重复的消息呢?

消息生产者产生了重复的消息

kafka和rocketmq的offset被回调了

消息消费者确认失败

消息消费者确认时超时了

业务系统主动发起重试

如果重复消息不做正确的处理,会对业务造成很大的影响,产生重复的数据,或者导致数据异常;

(2)数据一致性问题

如果mq的消费者业务处理异常的话,就会出现数据一致性问题。

比如:一个完整的业务流程是,下单成功之后,送100个积分。下单写库了,但是消息消费者在送积分的时候失败了,就会造成数据不一致的情况,即该业务流程的部分数据写库了,另外一部分没有写库。

如果下单和送积分在同一个事务中,要么同时成功,要么同时失败,是不会出现数据一致性问题的。

但由于跨系统调用,为了性能考虑,一般不会使用强一致性的方案,而改成达成最终一致性即可。

(3)消息丢失问题:

哪些场景会出现消息丢失问题呢?

消息生产者发生消息时,由于网络原因,发生到mq失败了。

mq服务器持久化时,磁盘出现异常

kafka和rocketmq的offset被回调时,略过了很多消息。

消息消费者刚读取消息,已经ack确认了,但业务还没处理完,服务就被重启了。

导致消息丢失问题的原因挺多的,生产者、mq服务器、消费者 都有可能产生问题,我在这里就不一一列举了。最终的结果会导致消费者无法正确的处理消息,而导致数据不一致的情况。

(4)消息顺序问题

有些业务数据是有状态的,比如订单有:下单、支付、完成、退货等状态,如果订单数据作为消息体,就会涉及顺序问题了。如果消费者收到同一个订单的两条消息,第一条消息的状态是下单,第二条消息的状态是支付,这是没问题的。但如果第一条消息的状态是支付,第二条消息的状态是下单就会有问题了,没有下单就先支付了?

消息顺序问题是一个非常棘手的问题,比如:

kafka同一个partition中能保证顺序,但是不同的partition无法保证顺序。

rabbitmq的同一个queue能够保证顺序,但是如果多个消费者同一个queue也会有顺序问题。

如果消费者使用多线程消费消息,也无法保证顺序。

如果消费消息时同一个订单的多条消息中,中间的一条消息出现异常情况,顺序将会被打乱。

还有如果生产者发送到mq中的路由规则,跟消费者不一样,也无法保证顺序。

(5)消息堆积问题

如果消息消费者读取消息的速度,能够跟上消息生产者的节奏,那么整套mq机制就能发挥最大作用。但是很多时候,由于某些批处理,或者其他原因,导致消息消费的速度小于生产的速度。这样会直接导致消息堆积问题,从而影响业务功能。

这里以下单开通会员为例,如果消息出现堆积,会导致用户下单之后,很久之后才能变成会员,这种情况肯定会引起大量用户投诉。

(6)系统复杂度提升

这里说的系统复杂度和系统耦合性是不一样的,比如以前只有:系统A、系统B和系统C 这三个系统,现在引入mq之后,你除了需要关注前面三个系统之外,还需要关注mq服务,需要关注的点越多,系统的复杂度越高。

mq的机制需要:生产者、mq服务器、消费者。

有一定的学习成本,需要额外部署mq服务器,而且有些mq比如:rocketmq,功能非常强大,用法有点复杂,如果使用不好,会出现很多问题。有些问题,不像接口调用那么容易排查,从而导致系统的复杂度提升了。

如何解决?

1,重复消费问题的解决:

不管是由于生产者产生的重复消息,还是由于消费者导致的重复消息,我们都可以在消费者中这个问题。

这就要求消费者在做业务处理时,要做幂等设计;

在这里我推荐增加一张消费消息表,来解决mq的这类问题。消费消息表中,使用messageId做唯一索引,在处理业务逻辑之前,先根据messageId查询一下该消息有没有处理过,如果已经处理过了则直接返回成功,如果没有处理过,则继续做业务处理。

2,数据一致性的解决:

数据一致性分为:

强一致性

弱一致性

最终一致性

而mq为了性能考虑使用的是最终一致性,那么必定会出现数据不一致的问题。这类问题大概率是因为消费者读取消息后,业务逻辑处理失败导致的,这时候可以增加重试机制。

重试分为:同步重试 和 异步重试。

有些消息量比较小的业务场景,可以采用同步重试,在消费消息时如果处理失败,立刻重试3-5次,如何还是失败,则写入到记录表中。但如果消息量比较大,则不建议使用这种方式,因为如果出现网络异常,可能会导致大量的消息不断重试,影响消息读取速度,造成消息堆积。

而消息量比较大的业务场景,建议采用异步重试,在消费者处理失败之后,立刻写入重试表,有个job专门定时重试。

还有一种做法是,如果消费失败,自己给同一个topic发一条消息,在后面的某个时间点,自己又会消费到那条消息,起到了重试的效果。如果对消息顺序要求不高的场景,可以使用这种方式。

3,消息丢失问题

不管你是否承认有时候消息真的会丢,即使这种概率非常小,也会对业务有影响。生产者、mq服务器、消费者都有可能会导致消息丢失的问题。

为了解决这个问题,我们可以增加一张消息发送表,当生产者发完消息之后,会往该表中写入一条数据,状态status标记为待确认。如果消费者读取消息之后,调用生产者的api更新该消息的status为已确认。有个job,每隔一段时间检查一次消息发送表,如果5分钟(这个时间可以根据实际情况来定)后还有状态是待确认的消息,则认为该消息已经丢失了,重新发条消息。

4,消息顺序问题

消息顺序问题是我们非常常见的问题,我们以kafka消费订单消息为例。订单有:下单、支付、完成、退货等状态,这些状态是有先后顺序的,如果顺序错了会导致业务异常。

订单号路由到不同的partition,同一个订单号的消息,每次到发到同一个partition。

5,消息堆积

如果消费者消费消息的速度小于生产者生产消息的速度,将会出现消息堆积问题。其实这类问题产生的原因很多,如果你想进一步了解,可以看看我的另一篇文章《 我用kafka两年踩过的一些非比寻常的坑 》。

那么消息堆积问题该如何解决呢?

这个要看消息是否需要保证顺序。

如果不需要保证顺序,可以读取消息之后用多线程处理业务逻辑。

这样就能增加业务逻辑处理速度,解决消息堆积问题。但是线程池的核心线程数和最大线程数需要合理配置,不然可能会浪费系统资源。

如果需要保证顺序,可以读取消息之后,将消息按照一定的规则分发到多个队列中,然后在队列中用单线程处理。

1、打开RocketMQ的配置文件brokerconf,找到delayTimeLevel参数,将其值改为19,表示新增一个延迟级别。

2、在RocketMQ的安装目录下,找到conf目录下的delayproperties文件,新增一行配置,格式为19=2592000,表示将新增的延迟级别对应的延迟时间设置为2592000秒,即30天。

3、在producer发送消息时,将delayTimeLevel参数设置为19,即可将消息的延迟时间设置为30天。

在遇到与第三方系统做对接时,MQ无疑是非常好的解决方案(解耦、异步)。但是如果引入MQ组件,随之要考虑的问题就变多了,如何保证MQ消息能够正常被业务消费。所以引入MQ消费失败情况下,自动重试功能是非常重要的。这里不过细讲MQ有哪些原因会导致失败。

MQ重试,网上有方案一般采用的是,本地消息表+定时任务,不清楚的可以自行了解下。

我这里提供一种另外的思路,供大家参考。方案实现在RabbitMQ(安装延迟队列插件)+NET CORE 31

设计思路为:

内置一个专门做重试的队列,这个队列是一个延迟队列,当业务队列消费失败时,将原始消息投递至重试队列,并设置延迟时间,当延迟时间到达后。重试队列消费会自动将消息重新投递会业务队列,如此便可以实现消息的重试,而且可以根据重试次数来自定义重试时间,比如像微信支付回调一样(第一次延迟3S,第二次延迟10S,第三次延迟60S),上面方案当然要保证MQ消费采用ACK机制。

那么如何让重试队列知道原来的业务队列是哪个,我们定义业务队列时,可以通过MQ的消息头内置一些信息:队列类型(业务队列也有可能是延迟队列)、重试次数(默认为 0)、交换机名称、路由键。业务队列消费失败时,将消息投递至重试队列时,则可以把业务队列的消息头传递至重试队列,那么重试队列消费,重新将消息发送给业务队列时,则可以知道业务队列所需要的所有参数(需要将重试次数+1)。

下面结合代码讲下具体实现:

我们先看看业务队列发送消息时,如何定义

这里会内置上面描述的重试队列需要的参数

再来看看业务队列消费如何处理,这里因为会自动重试,所以保证业务队列每次都是消费成功的(MQ才会将消息从队列中删除)

我们再看看PublishRetry重试队列的推送方法如何实现

重试队列的消费者实现

然后在系统中,内置重试队列消费者

以上就是关于RocketMQ - 基于延迟消息机制优化大量订单的定时退款扫描问题全部的内容,包括:RocketMQ - 基于延迟消息机制优化大量订单的定时退款扫描问题、怎么样提高rabbitmq 发送消息的速度、紧急求助MQ中无法从消息队列中连续读出消息的问题的相关推荐等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/web/9618376.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-30
下一篇2023-04-30

发表评论

登录后才能评论

评论列表(0条)

    保存