
com.rabbitmq amqp-client5.7.3
1.简单模式连接(1个消费者获取一个队列):发布者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Providers {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
ConnectionFactory factory = new ConnectionFactory();
//地址
factory.setHost("192.168.17.8");
//用户的【Can access virtual hosts】值
factory.setVirtualHost("/");
//服务器端口
factory.setPort(5672);
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("llh123456");
//获取连接
Connection connection = factory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//申明队列(对列名称,是否持久化,是否独占频道,是否自动销毁,其他参数Map集合)
channel.queueDeclare("First_llh",true,false,false,null);
//设置消息
String massges = "hellow rabbitmq";
//发送消息(对列名称,BasicProperties基础配置,消息的字节数组)
channel.basicPublish("","First_llh",null,massges.getBytes());
//关闭频道
channel.close();
//关闭连接
connection.close();
}
}
接收者代码:
import com.rabbitmq.client.*;
import util.RabbitConnection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Conmmons {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456");
//创建频道
Channel channel = geust.createChannel();
//接受消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//获取消息(delivery.getBody() = [B@4322ce5)然后设置编码UTF_8编码,在转换string类型
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("massges:[" + message +"]");
};
//对列名称,是否持久化,
channel.basicConsume("First_llh",true,deliverCallback,(consumerTag)->{});
}
}
考虑到创建连接的代码冗余:
写一个连接的工具类
package util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitConnection {
public static Connection getConection(String host,String SSL,Integer port,String user,String pwd) throws IOException, TimeoutException {
//创建连接
ConnectionFactory factory = new ConnectionFactory();
//地址
factory.setHost(host);
//用户的【Can access virtual hosts】值
factory.setVirtualHost(SSL);
//服务器端口
factory.setPort(port);
//用户名
factory.setUsername(user);
//密码
factory.setPassword(pwd);
//获取连接
Connection connection = factory.newConnection();
return connection;
}
}
调用工具类获取连接
//获取连接
Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456");
注:接收者启动会监视队列是否发送了新的消息,实时刷新的
2.工作模式连接(2个消费者获取一个队列):接收者1类代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import util.RabbitConnection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class ConmmonsTwo {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456");
//创建频道
Channel channel = geust.createChannel();
//接受消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//获取消息(delivery.getBody() = [B@4322ce5)然后设置编码UTF_8编码,在转换string类型
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("massges:[" + message +"]");
};
//对列名称,是否持久化,
channel.basicConsume("First_llh",true,deliverCallback,(consumerTag)->{});
}
}
接收者2类代码
import com.rabbitmq.client.*;
import util.RabbitConnection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Conmmons {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456");
//创建频道
Channel channel = geust.createChannel();
//接受消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//获取消息(delivery.getBody() = [B@4322ce5)然后设置编码UTF_8编码,在转换string类型
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("massges:[" + message +"]");
};
//对列名称,是否持久化,
channel.basicConsume("First_llh",true,deliverCallback,(consumerTag)->{});
}
}
注:工作模式下对列会均匀分配资源
1.同时运行 接收者1类 和 接收者2类
发送10条数据
读取者1的结果
读取者2的结果
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)