
RabbitMQ-Java-02-工作队列本案例是一个Maven项目假设你已经实现了上一节简单队列官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/ 核心概念 》原理
执行资源密集型任务时往往有多个队列,每个队列有多个工作线程去处理注意:一个消息必须保证只能被处理一次 *** 作步骤 》搭建环境
idea创建一个空项目创建一个Maven管理的modulepom.xml添加插件:指定JDK编译版本(为了支持lambda表达式,如果不手动添加后期idea报错根据提示会自动添加好)
pom.xml添加依赖:RabbitMQ相关org.apache.maven.plugins maven-compiler-plugin8 8
》工作队列案例(自动应答)com.rabbitmq amqp-client5.13.1 commons-io commons-io2.11.0
说明
主要是两步:提取工具类、分多线程同时处理一个队列 代码组成
RabbitMQ工具类:RabbitMqUtils
package cn.cnyasin.rabbit.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static Channel getChannel() throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 配置
factory.setHost("192.168.3.202");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
// 获取连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
return channel;
}
}
初始化:Initpackage cn.cnyasin.rabbit.worker;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
public class Init {
// 交换机名
public static final String EXCHANGE_NAME = "exchange01";
// 队列名
public static final String QUEUE_NAME = "queue01";
// 路由key
public static final String ROUTING_KEY = "routing01";
public static void main(String[] args) throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列、交换机、路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("初始化成功。。。");
System.exit(0);
}
}
工作队列01(线程一):Worker01package cn.cnyasin.rabbit.worker;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
public class Worker01 {
// 定义本工作队列要处理的队列名字
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
System.out.println("[*] 工作队列01正在等待接收消息。。。");
// 处理消息
channel.basicConsume(
// 队列名
QUEUE_NAME,
// 自动应答
true,
// 成功处理消息回调
(String consumerTag, Delivery message) -> {
// TODO 业务逻辑代码在这里
System.out.println(" [*] 成功处理消息:" + new String(message.getBody()));
},
// 处理消息失败回调
(String consumerTag) -> {
System.out.println("处理消息失败");
}
);
}
}
工作队列02(线程二):Worker02
方式一:将上面Worker01代码复制一份,名字改为Worker02,然后运行方式二:idea支持同一个run方法并行运行:
idea | Run | Edit Configurations | Allow parallel run 生产者:Task
package cn.cnyasin.rabbit.worker;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class Task {
// 交换机名
public static final String EXCHANGE_NAME = "exchange01";
// 路由key
public static final String ROUTING_KEY = "routing01";
public static void main(String[] args) throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
// 接收控制台输入
Scanner scanner = new Scanner(System.in);
System.out.println("[*] 等待控制台输入消息内容。。。");
while (scanner.hasNext()) {
String input = scanner.next();
if (input.equals("exit")) {
break;
}
// 发送消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, input.getBytes());
System.out.println("[*] 消息发送成功。。。");
}
channel.close();
System.out.println("[*] 用户退出。。。");
System.exit(0);
}
}
运行初始化:
Init -> main -> run 开启两个工作队列:
Worker01 -> main -> runWorker02 -> main -> run 运行生产者:
Task -> main -> run 》工作队列案例(手动应答与自动重新入队)
说明
为了防止消息丢失,往往都使用手动应答机制如果消息处理失败,自动重新入队代码还是基于上一节:工作队列案例(自动应答) 代码组成
RabbitMQ工具类:RabbitMqUtils
代码同:工作队列案例(自动应答) 初始化:Init
代码同:工作队列案例(自动应答) 工作队列01(线程一):Worker01
package cn.cnyasin.rabbit.ack;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
public class Worker01 {
// 定义本工作队列要处理的队列名字
public static final String QUEUE_NAME = "queue_ack";
public static void main(String[] args) throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
System.out.println("[*] 工作队列01正在等待接收消息。。。");
// 处理消息
channel.basicConsume(
// 队列名
QUEUE_NAME,
// 自动应答
false,
// 成功处理消息回调
(String consumerTag, Delivery message) -> {
try {
// 沉睡1秒
Thread.sleep(1000 * 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// TODO 业务逻辑代码在这里
System.out.println(" [*] 工作队列01成功处理消息:" + new String(message.getBody()));
// TODO 手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},
// 处理消息失败回调
(String consumerTag) -> {
System.out.println("处理消息失败");
}
);
}
}
工作队列02(线程二):Worker02package cn.cnyasin.rabbit.ack;
import cn.cnyasin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
public class Worker02 {
// 定义本工作队列要处理的队列名字
public static final String QUEUE_NAME = "queue_ack";
public static void main(String[] args) throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
System.out.println("[*] 工作队列02正在等待接收消息。。。");
// 处理消息
channel.basicConsume(
// 队列名
QUEUE_NAME,
// 自动应答
true,
// 成功处理消息回调
(String consumerTag, Delivery message) -> {
try {
// 沉睡10秒
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// TODO 业务逻辑代码在这里
System.out.println(" [*] 工作队列02成功处理消息:" + new String(message.getBody()));
// TODO 手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},
// 处理消息失败回调
(String consumerTag) -> {
System.out.println("处理消息失败");
}
);
}
}
生产者:Task
代码同:工作队列案例(自动应答) 运行初始化:
Init -> main -> run 开启两个工作队列:
Worker01 -> main -> runWorker02 -> main -> run 运行生产者:
Task -> main -> run 》消息持久化
说明
代码借用上一节代码,在此基础上只需要在生产者(Task)代码中进行以下改动
发送消息的时候,第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN,即可使消息持久化存储,即使RabbitMQ宕机消息也不会丢失。 核心代码
// 消息持久化 AMQP.BasicProperties properties = MessageProperties.PERSISTENT_TEXT_PLAIN; // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, input.getBytes());》不公平分发
说明
代码借用上一节代码,在此基础上只需要在消费者(Worker01、Worker02)代码中进行以下改动
处理消息之前,信道设置一下:channel.basicQos()该函数接收一个int类型值(0=公平分发,大于0=不公平分发,默认0),该值的意思是预取值,代表该工作线程一次取的消息数量
核心代码
// 设置不公平分发 int prefetchCount = 1; // 预取值 channel.basicQos(prefetchCount);备注
该教程部分内容收集自网络,感谢原作者。 附录
无
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)