
转载尚硅谷以及技术大佬博客,完整建议看B站尚硅谷rabbitmq视频
1.流量削峰
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单 *** 作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的 *** 作,但是比不能下单的体验要好。
2.应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单 *** 作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单 *** 作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
3.异步处理
有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些 *** 作。A 服务还能及时的得到异步处理成功的消息。
未完结
概念
生产者
产生数据发送消息的程序是生产者
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP ,Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了 *** 作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout
【
关于Exchange类型说明:总共有以下类型:直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout),消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话,routingKey 来表示也可称该参数为 binding key
1)fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中
2)direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去
3)topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了;
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
】
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
1.windows下安装rabbitmq
https://www.cnblogs.com/saryli/p/9729591.html
2.maven导入依赖
org.springframework.boot spring-boot-starter-amqp
3.配置文件文件中配置
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/
4.主启动类上加上@EnableRabbit注解
5.编写测试类
package com.example.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
@SpringBootTest
class DemoApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void creatExchange() {
DirectExchange mydirect = new DirectExchange("mydirect", true, false);
amqpAdmin.declareExchange(mydirect);
}
@Test
void createQueue(){
Queue queue = new Queue("mybe");
amqpAdmin.declareQueue(queue);
}
@Test
void binding(){
//String destination, Binding.DestinationType destinationType队列绑定, String exchange, String routingKey, @Nullable Map arguments
Binding binding = new Binding("mybe", Binding.DestinationType.QUEUE,"mydirect","mybe",null);
amqpAdmin.declareBinding(binding);
}
}
6.controller测试类编写
package com.example.demo.controller;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/test1")
@ResponseBody
public String test1(){
for (int i = 0;i<10;i++){
JSONObject json = new JSONObject();
json.put("你好", i);
json.put("儿童团", "网规划局");
rabbitTemplate.convertAndSend("mydirect", "mybe", json.toString());
System.out.println("发送了消息"+i);
}
return "ok";
}
}
7.消息监听类
package com.example.demo.service.impl;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class RabbitmqTest1Impl {
@RabbitListener(queues = {"mybe"})
public void getMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
Thread.sleep(2000);
String s = new String(body);
System.out.println("消费消息完成"+s);
}
}
8.控制台打印
关于rabbitmq消息应答机制
(1)生产者方面:生产者发送消息至MQ的数据丢失
(2)RabbitMQ方面:MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失
(3)消费者方面:消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完
1.配置文件中添加
#消息已发送到交换机(Exchange)时返回 spring.rabbitmq.publisher-/confirm/i-type=correlated # 消息在未被队列收到的情况下返回 spring.rabbitmq.template.mandatory=true spring.rabbitmq.publisher-returns=true # 开启消息手动确认机制 spring.rabbitmq.listener.simple.acknowledge-mode=manual
2.config类配置
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class RabbitmqConfig {
@Autowired
AmqpAdmin amqpAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//发送到exchange时调用回调函数
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("/confirm/iCallback:"+"相关数据:"+correlationData+" /confirm/iCallback:"+"确认情况:"+ack+" /confirm/iCallback:"+"原因:"+cause);
}
});
//设置消息抵达队列的失败回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("二位热热我热太热");
}
});
return rabbitTemplate;
}
@Bean
public void createNormalExchange(){
DirectExchange mydirect = new DirectExchange("mydirect3", true, false);
amqpAdmin.declareExchange(mydirect);
}
}
3.监听类改写
package com.example.demo.service.impl;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class RabbitmqTest2Impl {
@RabbitListener(queues = {"mybe"})
public void getMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
Thread.sleep(2000);
String s = new String(body);
System.out.println("消费消息完成"+s);
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
//消息消费方错误后的处理
//deliveryTag消息id
//multiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.
// requeue是否重新入队
channel.basicNack(deliveryTag,false,false);
e.printStackTrace();
}
}
}
关于死信队列
比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源
消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
关于延时队列
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
第一种是在创建队列的时候设置队列的“x-message-ttl”属性,另一种方式便是针对每条消息设置 TTL
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃
大概的示意图
代码
1.RabbitmqConfig中添加交换机、队列、绑定关系
@Bean
public void createExchange1(){
DirectExchange mydirect = new DirectExchange("order-event-exchage", true, false);
amqpAdmin.declareExchange(mydirect);
}
@Bean
public void createQueue1(){
//死信队列 这个队列无需监听
HashMap hashMap = new HashMap<>();
hashMap.put("x-dead-letter-exchange", "order-event-exchage"); //死信后交给的交换机
hashMap.put("x-dead-letter-routing-key", "order.release.order"); //死信后交给的路由键
hashMap.put("x-message-ttl",6000);//过这个时间后变成死信
Queue queue1 = new Queue("order.delay.queue", true, false, false, hashMap);
amqpAdmin.declareQueue(queue1);
//消费死信的队列 监听这个队列
Queue queue2 = new Queue("order.release.order.queue", true, false, false);
amqpAdmin.declareQueue(queue2);
}
@Bean
public void binding1(){
Binding binding1 = new Binding("order.delay.queue", Binding.DestinationType.QUEUE,"order-event-exchage","order.delay",null);
amqpAdmin.declareBinding(binding1);
Binding binding2 = new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,"order-event-exchage","order.release.order",null);
amqpAdmin.declareBinding(binding2);
}
2.创建一个新的监听类
package com.example.demo.service.impl;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class RabbitmqTest4Impl {
@RabbitListener(queues = {"order.release.order.queue"})
public void getMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
String s = new String(body);
System.out.println("我已成功消费死信"+s);
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.一个新的测试方法
@RequestMapping("/test2")
@ResponseBody
public String test2(){
JSONObject json = new JSONObject();
json.put("单点的", "postman");
json.put("1", "嗡嗡嗡");
rabbitTemplate.convertAndSend("order-event-exchage", "order.delay", json.toString());
System.out.println("发送了消息");
return "ok";
}
4.控制台打印
关于幂等性问题
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息
解决思路
MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。
优先级队列
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)