RabbitMQ

RabbitMQ,第1张

一、docker安装RabbitMQ
cd /opt/rabbitmq
mkdir data
# 拉取镜像
docker pull rabbitmq
# 查看镜像
docker images
# 拉取镜像到本地仓库,这里是直接安装最新的,
# 如果需要安装其他版本在rabbitmq后面跟上版本号即可
# docker pull rabbitmq
# 启动rabbitMq
docker run -d \
-v /opt/rabbitmq/data:/var/lib/rabbitmq \
-p 5672:5672 -p 15672:15672 --name rabbitmq --restart=always \
--hostname myRabbit rabbitmq:3-management

# 启动rabbitmq_management, rabbitmq 为容器的名称,使用id也可以
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
# ip+端口号登录,用户名和密码默认都是guest

浏览器访问http://ip:15672
初始账号密码 guest guest

官方使用docker安装的方式
这里直接设置了用户名和密码,而不再需要进入到容器内部设置。

如果您希望更改guest/的默认用户名和密码guest,可以使用RABBITMQ_DEFAULT_USER和RABBITMQ_DEFAULT_PASS环境变量进行更改

$ docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
二、RabbitMQ用户管理 2.1 逻辑结构
  • 用户
  • 虚拟主机
  • 队列
2.2 用户管理 2.2.1 命令行用户管理
  • 在linux中使用命令行创建用户

    ## 进入到rabbit_mq的sbin目录
    cd /usr/local/rabbitmq_server-3.7.0/sbin
    
    ## 新增用户
    ./rabbitmqctl add_user ytao admin123
    
  • 设置用户级别

    ## 用户级别:
    ## 1.administrator 可以登录控制台、查看所有信息、可以对RabbitMQ进行管理
    ## 2.monitoring 监控者 登录控制台、查看所有信息
    ## 3.policymaker 策略制定者 登录控制台、指定策略
    ## 4.managment 普通管理员 登录控制台
    
    ./rabbitmqctl set_user_tags ytao administrator
    
2.2.2 管理系统进行用户管理
  • 管理系统登录:访问http://47.96.11.185:15672/

    1.新增用户
2.创建虚拟主机
3.删除用户
4.用户绑定虚拟主机
三、RabbitMQ工作模式

RabbitMQ提供了多种消息的通信方式—工作模式

https://www.rabbitmq.com/getstarted.html

消息通信是由两个角色完成:消息生产者(producer)和 消息消费者(Consumer)

3.1 简单模式

一个队列只有一个消费者


生产者将消息发送到队列,消费者从队列取出数据

3.2 工作模式

多个消费者监听同一个队列

多个消费者监听同一个队列,但多个消费者中只有一个消费者会成功的消费消息
3.3 订阅模式

一个交换机绑定多个消息队列,每个消息队列有一个消费者监听

消息生产者发送的消息可以被每一个消费者接收
3.4 路由模式

一个交换机绑定多个消息队列,每个消息队列都由自己唯一的key,每个消息队列有一个消费者监听

四、RabbitMQ交换机和队列管理 4.1 创建队列

4.2 创建交换机

4.3 交换机绑定队列

五、在普通的Maven应用中使用MQ
RabbitMQ队列结构
5.1简单模式 5.1.1 消息生产者
  • 创建Maven项目

  • 添加RabbitMQ连接所需要的依赖

    
    <dependency>
        <groupId>com.rabbitmqgroupId>
        <artifactId>amqp-clientartifactId>
        <version>4.10.0version>
    dependency>
    
    <dependency>
        <groupId>org.slf4jgroupId>
        <artifactId>slf4j-log4j12artifactId>
        <version>1.7.25version>
        <scope>testscope>
    dependency>
    
    <dependency>
        <groupId>org.apache.commonsgroupId>
        <artifactId>commons-lang3artifactId>
        <version>3.9version>
    dependency>
    
  • 在resources目录下创建log4j.properties

    log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG 
    log4j.logger.org.mybatis = DEBUG
    log4j.appender.A1=org.apache.log4j.ConsoleAppender
    log4j.appender.A1.layout=org.apache.log4j.PatternLayout
    log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n
    
  • 创建MQ连接帮助类

    package com.qfedu.mq.utils;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConnectionUtil {
    
        public static Connection getConnection() throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2.在工厂对象中设置MQ的连接信息(ip,port,virtualhost,username,password)
            factory.setHost("47.96.11.185");
            factory.setPort(5672);
            factory.setVirtualHost("host1");
            factory.setUsername("ytao");
            factory.setPassword("admin123");
            //3.通过工厂对象获取与MQ的链接
            Connection connection = factory.newConnection();
            return connection;
        }
    
    }
    
  • 消息生产者发送消息

    package com.qfedu.mq.service;
    
    import com.qfedu.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class SendMsg {
    
        public static void main(String[] args) throws Exception{
    
            String msg = "Hello HuangDaoJun!";
            Connection connection = ConnectionUtil.getConnection();   
            Channel channel = connection.createChannel();    
    
            //定义队列(使用Java代码在MQ中新建一个队列)
            //参数1:定义的队列名称
            //参数2:队列中的数据是否持久化(如果选择了持久化)
            //参数3: 是否排外(当前队列是否为当前连接私有)
            //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
            //参数5:设置当前队列的参数
            //channel.queueDeclare("queue7",false,false,false,null);
    
            //参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
            //参数2:目标队列名称
            //参数3:设置当前这条消息的属性(设置过期时间 10)
            //参数4:消息的内容
            channel.basicPublish("","queue7",null,msg.getBytes());
            System.out.println("发送:" + msg);
    
            channel.close();
            connection.close();
        }
    
    }
    
5.1.2 消息消费者
  • 创建Maven项目

  • 添加依赖

  • log4j.properties

  • ConnetionUtil.java

  • 消费者消费消息

    package com.qfedu.mq.service;
    
    import com.qfedu.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, 
                            AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是从队列中获取的数据
                    String msg = new String(body);
                    System.out.println("接收:"+msg);
                }
            };
    
            channel.basicConsume("queue1",true,consumer);
        }
    }
    
5.2 工作模式

一个发送者多个消费者

5.2.1 发送者
public class SendMsg {

    public static void main(String[] args) throws Exception{
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();

            channel.basicPublish("","queue2",null,msg.getBytes());
            System.out.println("发送:" + msg);

            channel.close();
            connection.close();
        }
    }

}
5.2.2 消费者1
public class ReceiveMsg {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
                if("wait".equals(msg)){
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        channel.basicConsume("queue2",true,consumer);
    }
}
5.2.3 消费者2
public class ReceiveMsg {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };

        channel.basicConsume("queue2",true,consumer);
    }
}
5.3 订阅模式 5.3.1 发送者 发送消息到交换机
public class SendMsg {

    public static void main(String[] args) throws Exception{
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();

            channel.basicPublish("ex1","",null,msg.getBytes());
            System.out.println("发送:" + msg);

            channel.close();
            connection.close();
        }
    }

}
5.3.2 消费者1
public class ReceiveMsg1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
                if("wait".equals(msg)){
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        channel.basicConsume("queue3",true,consumer);
    }
}
5.3.3 消费者2
public class ReceiveMsg2 {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };

        channel.basicConsume("queue4",true,consumer);
    }
}
5.4 路由模式 5.4.1 发送者 发送消息到交换机
public class SendMsg {

    public static void main(String[] args) throws Exception{
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();

            if(msg.startsWith("a")){
                channel.basicPublish("ex2","a",null,msg.getBytes());
            }else if(msg.startsWith("b")){
                channel.basicPublish("ex2","b",null,msg.getBytes());
            }
            System.out.println("发送:" + msg);

            channel.close();
            connection.close();
        }
    }

}
5.4.2 消费者1
public class ReceiveMsg1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
                if("wait".equals(msg)){
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        channel.basicConsume("queue5",true,consumer);
    }
}
5.4.3 消费者2
public class ReceiveMsg2 {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };

        channel.basicConsume("queue6",true,consumer);
    }
}
六、在SpringBoot应用中使用MQ

SpringBoot应用可以完成自动配置及依赖注入——可以通过Spring直接提供与MQ的连接对象

6.1 消息生产者
  • 创建SpringBoot应用,添加依赖

  • | |
    | ---------------------------------------- |
    ||

  • 配置application.yml

    server:
      port: 9001
    spring:
      application:
        name: producer
      rabbitmq:
        host: 47.96.11.185
        port: 5672
        virtual-host: host1
        username: ytao
        password: admin123
    
  • 发送消息

    @Service
    public class TestService {
    
        @Resource
        private AmqpTemplate amqpTemplate;
    
        public void sendMsg(String msg){
    
            //1. 发送消息到队列
            amqpTemplate.convertAndSend("queue1",msg);
    
            //2. 发送消息到交换机(订阅交换机)
            amqpTemplate.convertAndSend("ex1","",msg);
    
            //3. 发送消息到交换机(路由交换机)
            amqpTemplate.convertAndSend("ex2","a",msg);
            
        }
    
    }
    
6.2 消息消费者
  • 创建项目添加依赖

  • 配置yml

  • 接收消息

    @Service
    //@RabbitListener(queues = {"queue1","queue2"})
    @RabbitListener(queues = "queue1")
    public class ReceiveMsgService {
    
        @RabbitHandler
        public void receiveMsg(String msg){
            System.out.println("接收MSG:"+msg);
        }
    
        //@RabbitHandler
        //public void receiveMsg(byte[] bs){
        //
        //}
    
    }
    
END:
  • RabbitMQ用户、交换机、队列管理
  • 基于普通Maven项目整合使用RabbitMQ(四种模式)
  • 基于SpringBoot应用整合使用RabbitMQ(四种模式)

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/743464.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-04-29
下一篇2022-04-29

发表评论

登录后才能评论

评论列表(0条)

    保存