
- 1.预备 *** 作
- 2. 设置批量发消息的个数
- 3. 编写单个确认代码
- 4. 编写批量确认代码
- 5. 编写异步发布确认代码
- 6. 运行测试代码
2. 设置批量发消息的个数与 RabbitMQ的相关 *** 作2–轮训分发消息中前两步一致,引入相关依赖,并编写创建信道的工具类代码
//批量发消息的个数
public static final int MESSAGE_COUNT = 1000;
3. 编写单个确认代码
//单个确认
public static void publicMessageIndividually() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
//开启发布确认
channel./confirm/iSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//单个消息马上进行发布确认
boolean flag = channel.waitFor/confirm/is();
if (flag){
System.out.println("消息发送成功:" + message);
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("单个确认耗费时间:"+(end-begin) + "ms");
}
4. 编写批量确认代码
//批量确认
public static void publicMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
//开启发布确认
channel./confirm/iSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量确认消息大小
int batchSize = 100;
//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//批量确认,100条确认一次
if ((i+1) % batchSize == 0 && channel.waitFor/confirm/is()){
System.out.println("消息发送成功");
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("批量确认耗费时间:"+(end-begin) + "ms");
}
5. 编写异步发布确认代码
//异步发布确认
public static void publicMessageAsync() throws IOException, TimeoutException{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
//开启发布确认
channel./confirm/iSelect();
ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>();
/confirm/iCallback ackCallback = (deliveryTag,mutiple) -> {
if (mutiple){//如果是累计确认,就累计删除。headMap,返回此映射的部分视图,其键值严格小于k.
//2:删除已经确认的消息,剩下的就算未确认的消息
ConcurrentNavigableMap confirmed = outstanding/confirm/is.headMap(deliveryTag);
/confirm/ied.clear();
}else {//累计确认可能会有消息丢失,一般用这个单个确认
outstanding/confirm/is.remove(deliveryTag);
}
System.out.println("确认的消息:"+deliveryTag);
};
/confirm/iCallback nackCallback = (deliveryTag,mutiple) -> {
//3:编写未确认的消息
String message = outstanding/confirm/is.get(deliveryTag);
System.out.println("未确认的消息是:"+message + "--未确认的消息tag:" +deliveryTag);
};
//准备消息的监听器,监听哪些消息成功或失败
channel.add/confirm/iListener(ackCallback,nackCallback); //异步通知
//开始时间
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = ""+i;
channel.basicPublish("",queueName,null,message.getBytes());
//1:此处记录下所有要发送的消息(序号,信息)
outstanding/confirm/is.put(channel.getNextPublishSeqNo(),message);
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("异步确认耗费时间:"+(end-begin) + "ms");
}
6. 运行测试代码
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
//1.单个确认
// publicMessageIndividually(); //单个确认耗费时间:10864ms
//2.批量确认
// publicMessageBatch(); //批量确认耗费时间:168ms
//3.异步批量确认
publicMessageAsync(); //异步确认耗费时间:25ms
}
可以发现,异步确认消耗的时间远小于其他两种
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)