
以下介绍消息发布确认的三种模式:
1.单个确认模式 2.批量确认模式 3.异步批量确认模式
先创建一个连接工具类
package com.zevin.rabbitmq.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.*;
public class RabbitMqUtils {
public static Channel channel()throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置工厂IP 目的:连接rabbitMQ队列
factory.setHost("123.123.123");//此处为服务器ip
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("1236548974");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
return channel;
}
}
接着开始测试三种发布确认模式
package com.zevin.rabbitmq.four;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iCallback;
import com.zevin.rabbitmq.utils.RabbitMqUtils;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class ConfirmMessage {
//批量发消息的个数
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception{
/
// * 2.批量确认模式
// * 3.异步批量确认模式
publishMessageAsync();
}
//单个确认
public static void publishMessageIndividually()throws Exception{
Channel channel = RabbitMqUtils.channel();
//队列的声明
String ququeName = UUID.randomUUID().toString();
channel.queueDeclare(ququeName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String messag = i + "";
channel.basicPublish("",ququeName,null,messag.getBytes());
//单个消息就马上进行发布确认
boolean flag = channel.waitForConfirms();
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end-begin));
}
//批量发布确认
public static void publishMessageBatch()throws Exception{
Channel channel = RabbitMqUtils.channel();
//队列的声明
String ququeName = UUID.randomUUID().toString();
channel.queueDeclare(ququeName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量确认消息大小
int batchSize = 100;
//批量发消息 (批量发布确认)!!!!!!
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",ququeName,true,null,message.getBytes());
if (i+1%batchSize == 0){
channel.waitForConfirms();
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时"+(end-begin));
}
//异步发布 确认
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitMqUtils.channel();
//队列的声明
String ququeName = UUID.randomUUID().toString();
channel.queueDeclare(ququeName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>();
//消息确认成功回调函数
ConfirmCallback ackcallback = (var1,var3)->{
if (var3){
//(2)删除掉已经确认的消息 剩下的就是未确认的消息
ConcurrentNavigableMap confirmed = outstanding/confirm/is.headMap(var1);
/confirm/ied.clear();
}else {
outstanding/confirm/is.remove(var1);
}
System.out.println("确认的消息"+var1);
};
//消息确认失败回调函数
ConfirmCallback nackcallback = (var1,var3)->{
//(3) 打印一下未确认的消息有哪些
String message = outstanding/confirm/is.get(var1);
System.out.println("未确认的消息是:"+message+":::::未确认的消息tag:"+var1);
};
//准备消息的监听器 监听哪些成功了 哪些消息失败了
channel.addConfirmListener(ackcallback,nackcallback);
//开始时间
long begin = System.currentTimeMillis();
//批量发送
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i ;
channel.basicPublish("",ququeName,null,message.getBytes());
//(1)记录下所有要发送的消息 消息的总和
outstanding/confirm/is.put(channel.getNextPublishSeqNo(),message);
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个异步发布确认消息,耗时"+(end-begin));
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)