RabbitMQ的简单使用

RabbitMQ的简单使用,第1张

RabbitMQ的简单使用

转载尚硅谷以及技术大佬博客,完整建议看B站尚硅谷rabbitmq视频

1.流量削峰
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单 *** 作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的 *** 作,但是比不能下单的体验要好。

2.应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单 *** 作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单 *** 作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

3.异步处理
有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些 *** 作。A 服务还能及时的得到异步处理成功的消息。

未完结
概念

生产者
产生数据发送消息的程序是生产者

交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。


Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP ,Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了 *** 作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout


关于Exchange类型说明:总共有以下类型:直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout),消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话,routingKey 来表示也可称该参数为 binding key

1)fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中
2)direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去
3)topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节

*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词

当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了;
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了


Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

1.windows下安装rabbitmq

https://www.cnblogs.com/saryli/p/9729591.html

2.maven导入依赖

		
			org.springframework.boot
			spring-boot-starter-amqp
		

3.配置文件文件中配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

4.主启动类上加上@EnableRabbit注解

5.编写测试类

package com.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;

@SpringBootTest
class DemoApplicationTests {

	@Autowired
	AmqpAdmin amqpAdmin;

	@Autowired
	RabbitTemplate rabbitTemplate;
	
	@Test
	void creatExchange() {
		DirectExchange mydirect = new DirectExchange("mydirect", true, false);
		amqpAdmin.declareExchange(mydirect);
	}

	
	@Test
	void createQueue(){
		Queue queue = new Queue("mybe");
		amqpAdmin.declareQueue(queue);
	}

	
	@Test
	void binding(){
		//String destination, Binding.DestinationType destinationType队列绑定, String exchange, String routingKey, @Nullable Map arguments
		Binding binding = new Binding("mybe", Binding.DestinationType.QUEUE,"mydirect","mybe",null);
		amqpAdmin.declareBinding(binding);
	}
}

6.controller测试类编写

package com.example.demo.controller;

import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@RequestMapping("/rabbit")
public class RabbitController {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/test1")
    @ResponseBody
    public String test1(){
            for (int i = 0;i<10;i++){
                JSONObject json = new JSONObject();
                json.put("你好", i);
                json.put("儿童团", "网规划局");

                rabbitTemplate.convertAndSend("mydirect", "mybe", json.toString());
                System.out.println("发送了消息"+i);
            }
        return "ok";
    }
}

7.消息监听类

package com.example.demo.service.impl;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class RabbitmqTest1Impl {

    @RabbitListener(queues = {"mybe"})
    public void getMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        Thread.sleep(2000);
        String s = new String(body);
        System.out.println("消费消息完成"+s);

    }
}


8.控制台打印

关于rabbitmq消息应答机制

(1)生产者方面:生产者发送消息至MQ的数据丢失
(2)RabbitMQ方面:MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失
(3)消费者方面:消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完

1.配置文件中添加

#消息已发送到交换机(Exchange)时返回
spring.rabbitmq.publisher-/confirm/i-type=correlated
# 消息在未被队列收到的情况下返回
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true 
# 开启消息手动确认机制
spring.rabbitmq.listener.simple.acknowledge-mode=manual

2.config类配置

package com.example.demo.config;


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
public class RabbitmqConfig {
    @Autowired
    AmqpAdmin amqpAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //发送到exchange时调用回调函数

        rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
            @Override
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("/confirm/iCallback:"+"相关数据:"+correlationData+" /confirm/iCallback:"+"确认情况:"+ack+" /confirm/iCallback:"+"原因:"+cause);

            }
        });

        //设置消息抵达队列的失败回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("二位热热我热太热");
            }
        });
        return rabbitTemplate;
    }

    
    @Bean
    public void createNormalExchange(){
        DirectExchange mydirect = new DirectExchange("mydirect3", true, false);
        amqpAdmin.declareExchange(mydirect);
    }
}

3.监听类改写

package com.example.demo.service.impl;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class RabbitmqTest2Impl {

    @RabbitListener(queues = {"mybe"})
    public void getMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            byte[] body = message.getBody();
            MessageProperties messageProperties = message.getMessageProperties();
            Thread.sleep(2000);
            String s = new String(body);
            System.out.println("消费消息完成"+s);

            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            //消息消费方错误后的处理
            //deliveryTag消息id
            //multiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.
            // requeue是否重新入队
            channel.basicNack(deliveryTag,false,false);

            e.printStackTrace();
        }
    }
}

关于死信队列
比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的来源
消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

关于延时队列

1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒

第一种是在创建队列的时候设置队列的“x-message-ttl”属性,另一种方式便是针对每条消息设置 TTL

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃

大概的示意图

代码
1.RabbitmqConfig中添加交换机、队列、绑定关系

    
    @Bean
    public void createExchange1(){
        DirectExchange mydirect = new DirectExchange("order-event-exchage", true, false);
        amqpAdmin.declareExchange(mydirect);
    }

    
    @Bean
    public void createQueue1(){

        //死信队列  这个队列无需监听
        HashMap hashMap = new HashMap<>();
        hashMap.put("x-dead-letter-exchange", "order-event-exchage"); //死信后交给的交换机
        hashMap.put("x-dead-letter-routing-key", "order.release.order"); //死信后交给的路由键
        hashMap.put("x-message-ttl",6000);//过这个时间后变成死信
        Queue queue1 = new Queue("order.delay.queue", true, false, false, hashMap);
        amqpAdmin.declareQueue(queue1);

        //消费死信的队列  监听这个队列
        Queue queue2 = new Queue("order.release.order.queue", true, false, false);
        amqpAdmin.declareQueue(queue2);
    }

    
    @Bean
    public void binding1(){
        Binding binding1 = new Binding("order.delay.queue", Binding.DestinationType.QUEUE,"order-event-exchage","order.delay",null);
        amqpAdmin.declareBinding(binding1);

        Binding binding2 = new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,"order-event-exchage","order.release.order",null);
        amqpAdmin.declareBinding(binding2);
    }

2.创建一个新的监听类

package com.example.demo.service.impl;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class RabbitmqTest4Impl {

    @RabbitListener(queues = {"order.release.order.queue"})
    public void getMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            byte[] body = message.getBody();
            MessageProperties messageProperties = message.getMessageProperties();
            String s = new String(body);
            System.out.println("我已成功消费死信"+s);

            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {

            e.printStackTrace();
        }
    }
}

3.一个新的测试方法

    @RequestMapping("/test2")
    @ResponseBody
    public String test2(){
        JSONObject json = new JSONObject();
        json.put("单点的", "postman");
        json.put("1", "嗡嗡嗡");
        rabbitTemplate.convertAndSend("order-event-exchage", "order.delay", json.toString());
        System.out.println("发送了消息");
        return "ok";
    }

4.控制台打印

关于幂等性问题
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息

解决思路
MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。

优先级队列


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5709440.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存