RabbitMQ的相关 *** 作4--发布确认

RabbitMQ的相关 *** 作4--发布确认,第1张

RabbitMQ的相关 *** 作4--发布确认

目录
  • 1.预备 *** 作
  • 2. 设置批量发消息的个数
  • 3. 编写单个确认代码
  • 4. 编写批量确认代码
  • 5. 编写异步发布确认代码
  • 6. 运行测试代码

1.预备 *** 作

与 RabbitMQ的相关 *** 作2–轮训分发消息中前两步一致,引入相关依赖,并编写创建信道的工具类代码

2. 设置批量发消息的个数
	//批量发消息的个数
    public static final int MESSAGE_COUNT = 1000;
3. 编写单个确认代码
//单个确认
    public static void publicMessageIndividually() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel./confirm/iSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            //单个消息马上进行发布确认
            boolean flag = channel.waitFor/confirm/is();
            if (flag){
                System.out.println("消息发送成功:" + message);
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("单个确认耗费时间:"+(end-begin) + "ms");
    }
4. 编写批量确认代码
    //批量确认
    public static void publicMessageBatch() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel./confirm/iSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //批量确认消息大小
        int batchSize = 100;
        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            //批量确认,100条确认一次
            if ((i+1) % batchSize == 0 && channel.waitFor/confirm/is()){
                System.out.println("消息发送成功");
            }
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("批量确认耗费时间:"+(end-begin) + "ms");
    }
5. 编写异步发布确认代码
    //异步发布确认
    public static void publicMessageAsync() throws IOException, TimeoutException{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel./confirm/iSelect();

        
        ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>();

        
        /confirm/iCallback ackCallback = (deliveryTag,mutiple) -> {
            if (mutiple){//如果是累计确认,就累计删除。headMap,返回此映射的部分视图,其键值严格小于k.
                //2:删除已经确认的消息,剩下的就算未确认的消息
                ConcurrentNavigableMap confirmed = outstanding/confirm/is.headMap(deliveryTag);
                /confirm/ied.clear();
            }else {//累计确认可能会有消息丢失,一般用这个单个确认
                outstanding/confirm/is.remove(deliveryTag);
            }
            System.out.println("确认的消息:"+deliveryTag);
        };
        
        /confirm/iCallback nackCallback = (deliveryTag,mutiple) -> {
            //3:编写未确认的消息
            String message = outstanding/confirm/is.get(deliveryTag);
            System.out.println("未确认的消息是:"+message + "--未确认的消息tag:" +deliveryTag);
        };

        //准备消息的监听器,监听哪些消息成功或失败
        channel.add/confirm/iListener(ackCallback,nackCallback);   //异步通知

        //开始时间
        long begin = System.currentTimeMillis();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = ""+i;
            channel.basicPublish("",queueName,null,message.getBytes());

            //1:此处记录下所有要发送的消息(序号,信息)
            outstanding/confirm/is.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("异步确认耗费时间:"+(end-begin) + "ms");
    }
6. 运行测试代码
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        //1.单个确认
//        publicMessageIndividually();    //单个确认耗费时间:10864ms
        //2.批量确认
//        publicMessageBatch();   //批量确认耗费时间:168ms
        //3.异步批量确认
        publicMessageAsync();   //异步确认耗费时间:25ms
    }

可以发现,异步确认消耗的时间远小于其他两种

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5618700.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-15
下一篇2022-12-15

发表评论

登录后才能评论

评论列表(0条)

    保存