
-
Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
-
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置参数
factory.setHost("192.168.16.62"); // ip 默认值 localhost
factory.setPort(5672); // 端口 默认值 5672
factory.setVirtualHost("/"); //虚拟机 默认值 /
factory.setUsername("guest"); // 用户名 默认值 guest
factory.setPassword("guest"); // 密码 默认值 guest
// 3、创建连接 Connection
Connection connection = factory.newConnection();
// 4、创建Channel
Channel channel = connection.createChannel();
// 5、创建队列
//如果没有名称为 hello_world 的队列,则会创建
channel.queueDeclare("work_queues", true, false, false, null);
// 6、发送消息
for (int i = 0; i < 10; i++) {
String body = i + "hello world";
channel.basicPublish("","work_queues",null,body.getBytes());
}
// 7、释放资源
channel.close();
connection.close();
}
}
消费者 consumer:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置参数
// ip 默认值 localhost, 192.168.16.62 :启动MQ服务器的ip
factory.setHost("192.168.16.62");
factory.setPort(5672); // 端口 默认值 5672
factory.setVirtualHost("/"); //虚拟机 默认值 /
factory.setUsername("guest"); // 用户名 默认值 guest
factory.setPassword("guest"); // 密码 默认值 guest
// 3、创建连接 Connection
Connection connection = factory.newConnection();
// 4、创建Channel
Channel channel = connection.createChannel();
// 5、创建队列
//如果没有名称为 hello_world 的队列,则会创建
channel.queueDeclare("work_queues", true, false, false, null);
//6、接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag : "+consumerTag);
// System.out.println("envelope : "+envelope);
// System.out.println("properties : "+properties);
System.out.println("body : "+ new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置参数
// ip 默认值 localhost, 192.168.16.62 :启动MQ服务器的ip
factory.setHost("192.168.16.62");
factory.setPort(5672); // 端口 默认值 5672
factory.setVirtualHost("/"); //虚拟机 默认值 /
factory.setUsername("guest"); // 用户名 默认值 guest
factory.setPassword("guest"); // 密码 默认值 guest
// 3、创建连接 Connection
Connection connection = factory.newConnection();
// 4、创建Channel
Channel channel = connection.createChannel();
// 5、创建队列
//如果没有名称为 hello_world 的队列,则会创建
channel.queueDeclare("work_queues", true, false, false, null);
//6、接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag : "+consumerTag);
// System.out.println("envelope : "+envelope);
// System.out.println("properties : "+properties);
System.out.println("body : "+ new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
}
}
结果:
小结:
-
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
-
Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)