
1、笨拙点方法,就是轮循,consume的阻塞监听可以设置timeout,通过设置一个较小的timeout,可以轮流监听几个channel,变相实现监听多个queue,对性能要求不是很高,可以使用这种方法
2、还有个办法就是先取出一个队列的消息数,然后循环的都读出后,转去读另一个队列,所有队列如果都没有消息了,就这样循环等待着
3、还有一个专业人士的回答,但是我还没有完全理解:
消费者(consumer)这是个业务层的概念,而消费或者说订阅(也就是 consume)是 AMQP 协议层的东西,所以,你问一个消费者能否订阅多个queue,答案是当然可以。方案也就一种,按照协议的流程分别向不同的 queue 进行 consume。至于是使用多线程方式来处理,还是使用事件驱动的方式(单线程)来处理这就取决于实现了。
如果对多线程控制能力不是很强,建议不要用这种方式,太专业了
4、这个问题后来我仔细想过,也许可以在应用层进行一下重新设计,可以用线程池作为多个consumer只读取出消息,不进行处理,然后publish进另一个队列,然后用由一个consumer来处理消息
在遇到与第三方系统做对接时,MQ无疑是非常好的解决方案(解耦、异步)。但是如果引入MQ组件,随之要考虑的问题就变多了,如何保证MQ消息能够正常被业务消费。所以引入MQ消费失败情况下,自动重试功能是非常重要的。这里不过细讲MQ有哪些原因会导致失败。
MQ重试,网上有方案一般采用的是,本地消息表+定时任务,不清楚的可以自行了解下。
我这里提供一种另外的思路,供大家参考。方案实现在RabbitMQ(安装延迟队列插件)+NET CORE 31
设计思路为:
内置一个专门做重试的队列,这个队列是一个延迟队列,当业务队列消费失败时,将原始消息投递至重试队列,并设置延迟时间,当延迟时间到达后。重试队列消费会自动将消息重新投递会业务队列,如此便可以实现消息的重试,而且可以根据重试次数来自定义重试时间,比如像微信支付回调一样(第一次延迟3S,第二次延迟10S,第三次延迟60S),上面方案当然要保证MQ消费采用ACK机制。
那么如何让重试队列知道原来的业务队列是哪个,我们定义业务队列时,可以通过MQ的消息头内置一些信息:队列类型(业务队列也有可能是延迟队列)、重试次数(默认为 0)、交换机名称、路由键。业务队列消费失败时,将消息投递至重试队列时,则可以把业务队列的消息头传递至重试队列,那么重试队列消费,重新将消息发送给业务队列时,则可以知道业务队列所需要的所有参数(需要将重试次数+1)。
下面结合代码讲下具体实现:
我们先看看业务队列发送消息时,如何定义
这里会内置上面描述的重试队列需要的参数
再来看看业务队列消费如何处理,这里因为会自动重试,所以保证业务队列每次都是消费成功的(MQ才会将消息从队列中删除)
我们再看看PublishRetry重试队列的推送方法如何实现
重试队列的消费者实现
然后在系统中,内置重试队列消费者
先从整体流程上简单梳理一下消息队列负载的过程。
消息队列负载由Rebalance线程默认每隔20s进行一次消息队列负载,获取主题队列信息mqSet与消费组当前所有消费者cidAll,然后按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消息队列,同一个消息消费队列同一时间只会分配给一个消费者。此时,可以计算当前消费者分配到消息队列集合,对比原先的负载队列与当前的分配队列。如果新队列集合中不包含原来的队列,则停止原先队列消息消费并移除,如果原先队列中不包含新分配队列则创建PullRequest。
进行负载均衡是在RebalanceService线程中启动的,一个MQClientInstance持有一个RebalanceService实现,并随着MQClientInstance的启动而启动。
从上面可以看出,MQClientinstance遍历已注册的消费者,对消费者执行doRebalance方法。
上面是遍历订阅信息对每个主题的队列进行重新负载。接下来将执行 rebalanceByTopic 方法,会根据广播模式或集群模式分别采用不同的方法进行处理。在此处,只解释集群模式下的方法。
获取该主题下的队列信息和该消费组内当前所有的消费者ID。每个DefaultMQPushConsumerImpl都持有一个单独的RebalanceImpl对象。
对该主题下的队列信息和该消费组内当前所有的消费者ID进行排序,确保一个消费组的成员看到的顺序是一致的,防止同一个消费队列不会被多个消费者分配。
allocateResult 记录的是当前消费者的所分配的消息队列
调用 updateProcessQueueTableInRebalance 对比消息队列是否发生变化
从上面看, processQueueTable 记录的是当前消费者负载的消息队列缓存表,该方法里面的 mqSet 记录的的是当前消费者经过负载分配后的消息队列集合。如果 processQueueTable 中的消息队列在 mqSet 中不存在,说明该消息队列已经被分配给其他消费者,所以需要暂停该消息队列消息的消费,通过 pqsetDropped(true); 该语句即可。
然后通过 removeUnnecessaryMessageQueue方法判断是否该mq从缓存中移除。
之后,开始遍历本次负载分配给该消费者的消息队列结合mqSet。如果processQueueTable中没有包含该消息队列,表示这是本次新增加的消息队列。
首先从内存中移除该消息队列的消息进度,然后调用 computePullFromWhere 从磁盘中读取该消息队列的消费进度,创建一个PullRequest对象。
从上面看出,主要有三种计算消息进度的方法,有些大同小异。
首先从磁盘中获取该消息队列的消费进度,如果大于0,说明该消息队列已经被消费过了,下次消费从该位置继续消费。如果等于-1,尝试去 *** 作消息存储时间戳作为消费者启动的时间戳,如果能找到则返回找到的偏移量,找不到则返回0;如果小于-1,则说明该消息进度文件中存储了错误的偏移量,返回-1。
在该方法的最后,会调用 dispatchPullRequest 方法,将PullRequest加入到PullMessageService中,以唤醒PullMessageService线程,进行消息拉取。
到这里,消费者负载均衡方面就结束了。
以上就是关于RabbitMQ怎样能实现多个队列由一个消费者来接收消息全部的内容,包括:RabbitMQ怎样能实现多个队列由一个消费者来接收消息、MQ消费失败,自动重试思路,如何保证MQ消息正常被业务消费、RocketMQ消费者消息队列负载均衡等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)