Rabbit发布确认模式

Rabbit发布确认模式,第1张

Rabbit发布确认模式

以下介绍消息发布确认的三种模式:

1.单个确认模式
2.批量确认模式
3.异步批量确认模式

先创建一个连接工具类

package com.zevin.rabbitmq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.*;




public class RabbitMqUtils {
    public static Channel channel()throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置工厂IP   目的:连接rabbitMQ队列
        factory.setHost("123.123.123");//此处为服务器ip
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("1236548974");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        return channel;
    }
}

接着开始测试三种发布确认模式

package com.zevin.rabbitmq.four;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iCallback;
import com.zevin.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;


public class ConfirmMessage {
    //批量发消息的个数
    public  static  final  int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception{
        /
        // * 2.批量确认模式
        
        // * 3.异步批量确认模式
        publishMessageAsync();

    }

    //单个确认
   public static  void  publishMessageIndividually()throws Exception{
       Channel channel = RabbitMqUtils.channel();
       //队列的声明
       String ququeName = UUID.randomUUID().toString();
       channel.queueDeclare(ququeName,true,false,false,null);
       //开启发布确认
       channel.confirmSelect();
       //开始时间
       long begin = System.currentTimeMillis();

       //批量发消息
       for (int i = 0; i < MESSAGE_COUNT; i++) {
           String messag = i + "";
           channel.basicPublish("",ququeName,null,messag.getBytes());
           //单个消息就马上进行发布确认
          boolean flag  =  channel.waitForConfirms();
          
       }
       //结束时间
       long end = System.currentTimeMillis();
       System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end-begin));
   }

    //批量发布确认
    public static  void  publishMessageBatch()throws Exception{
        Channel channel = RabbitMqUtils.channel();
        //队列的声明
        String ququeName = UUID.randomUUID().toString();
        channel.queueDeclare(ququeName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int batchSize = 100;

        //批量发消息  (批量发布确认)!!!!!!

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",ququeName,true,null,message.getBytes());

            if (i+1%batchSize == 0){
                channel.waitForConfirms();
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时"+(end-begin));
    }

    //异步发布 确认
    public  static  void  publishMessageAsync() throws Exception{

        Channel channel = RabbitMqUtils.channel();
        //队列的声明
        String ququeName = UUID.randomUUID().toString();
        channel.queueDeclare(ququeName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();

        
        ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>();

        //消息确认成功回调函数
        
        ConfirmCallback ackcallback = (var1,var3)->{
            if (var3){
                //(2)删除掉已经确认的消息  剩下的就是未确认的消息
                ConcurrentNavigableMap confirmed = outstanding/confirm/is.headMap(var1);
                /confirm/ied.clear();
            }else {
                outstanding/confirm/is.remove(var1);
            }
            System.out.println("确认的消息"+var1);
        };
        //消息确认失败回调函数
        ConfirmCallback nackcallback = (var1,var3)->{
            //(3) 打印一下未确认的消息有哪些
            String message = outstanding/confirm/is.get(var1);
            System.out.println("未确认的消息是:"+message+":::::未确认的消息tag:"+var1);
        };
        
        //准备消息的监听器 监听哪些成功了 哪些消息失败了
        channel.addConfirmListener(ackcallback,nackcallback);

        //开始时间
        long begin = System.currentTimeMillis();
        //批量发送
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i ;
            channel.basicPublish("",ququeName,null,message.getBytes());
            //(1)记录下所有要发送的消息  消息的总和
            outstanding/confirm/is.put(channel.getNextPublishSeqNo(),message);

        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个异步发布确认消息,耗时"+(end-begin));
    }
}

 

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5722302.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-18
下一篇2022-12-18

发表评论

登录后才能评论

评论列表(0条)

    保存