
创建生产者Producerjunit junit4.12 com.rabbitmq amqp-client5.14.0 junit junitRELEASE compile
package a_direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
//消息的生产者
public class Producer {
@Test
public void sendMessage() throws Exception{
//1、创建一个连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
//2、设置相关的参数
connectionFactory.setHost("39.105.127.232");
connectionFactory.setPort(5672);
connectionFactory.setUsername("sxt");//设置用户密码
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/v-sxt");//虚拟主机
//3、从链接工厂里面创建一个链接
Connection connection = connectionFactory.newConnection();
//4、创建一个通道
Channel channel = connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);
channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
//7、关闭通道和连接
channel.close();
connection.close();
System.out.println("消息发送成功");
}
}
创建消费者Consumer
package a_direct;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
//消息的生产者
public class Consumer {
@Test
public void sendMessage() throws Exception{
//1、创建一个连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
//2、设置相关的参数
connectionFactory.setHost("39.105.127.232");
connectionFactory.setPort(5672);
connectionFactory.setUsername("sxt");//设置用户密码
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/v-sxt");//虚拟主机
//3、从链接工厂里面创建一个链接
Connection connection = connectionFactory.newConnection();
//4、创建一个通道
Channel channel = connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者接收到消息:"+new String(body));
//super.handleDelivery(consumerTag, envelope, properties, body);
}
});
//不能让程序结束
System.in.read();
// //7、关闭通道和连接
// channel.close();
// connection.close();
// System.out.println("消息发送成功");
}
}
编写工具类优化代码
项目整体视图
package a_direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;
//消息的生产者
public class Producer {
@Test
public void sendMessage() throws Exception{
// //1、创建一个连接工厂
// ConnectionFactory connectionFactory=new ConnectionFactory();
// //2、设置相关的参数
// connectionFactory.setHost("39.105.127.232");
// connectionFactory.setPort(5672);
// connectionFactory.setUsername("sxt");//设置用户密码
// connectionFactory.setPassword("123456");
// connectionFactory.setVirtualHost("/v-sxt");//虚拟主机
// //3、从链接工厂里面创建一个链接
// Connection connection = connectionFactory.newConnection();
Connection connection = RabbitMQUtils.getConnection();
//4、创建一个通道
Channel channel = connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);要想消息持久化就要设置上面那行的参数
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
//channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());要想消息持久化就要设置上面那行的参数
//7、关闭通道和连接
// channel.close();
// connection.close();
RabbitMQUtils.closeChannelAndConnection(channel,connection);
System.out.println("消息发送成功");
}
}
消费者Consumer
package a_direct;
import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
//消息的生产者
public class Consumer {
@Test
public void sendMessage() throws Exception{
// //1、创建一个连接工厂
// ConnectionFactory connectionFactory=new ConnectionFactory();
// //2、设置相关的参数
// connectionFactory.setHost("39.105.127.232");
// connectionFactory.setPort(5672);
// connectionFactory.setUsername("sxt");//设置用户密码
// connectionFactory.setPassword("123456");
// connectionFactory.setVirtualHost("/v-sxt");//虚拟主机
// //3、从链接工厂里面创建一个链接
//
Connection connection = RabbitMQUtils.getConnection();
//4、创建一个通道
Channel channel = connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者接收到消息:"+new String(body));
//super.handleDelivery(consumerTag, envelope, properties, body);
}
});
//不能让程序结束
System.in.read();
// //7、关闭通道和连接
// channel.close();
// connection.close();
// System.out.println("消息发送成功");
}
}
工具类RabbitMQUtils
package utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
//工具类
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
static {
connectionFactory=new ConnectionFactory();
//2、设置相关的参数
connectionFactory.setHost("39.105.127.232");
connectionFactory.setPort(5672);
connectionFactory.setUsername("sxt");//设置用户密码
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/v-sxt");//虚拟主机
}
public static Connection getConnection(){
try {
//从链接工厂里面创建一个链接
Connection connection = connectionFactory.newConnection();
return connection;
}catch (Exception e){
System.out.println(e);
}
return null;
}
public static void closeChannelAndConnection(Channel channel,Connection connection){
try {
if(null!=channel) channel.close();
if (null!=connection) connection.close();
}catch (Exception e){
System.out.println(e);
}
}
}
work queue
整体视图
注意:这里的RabbitMQUtils工具类为上种模型所编写
package b_workqueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;
//生产者
public class Producer {
@Test
public void sendMessage() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("hello",false,false,false,null);
//发送消息
for (int i=1;i<=100;i++){
channel.basicPublish("","hello",null,("hello rabbitmq-----workqueue"+i).getBytes());
}
//关闭
RabbitMQUtils.closeChannelAndConnection(channel,connection);
System.out.println("消息发送成功");
}
}
创建消费者1
package b_workqueue;
import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
//消费者
public class Consumer1 {
@Test
public void receiveMessage() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("hello",false,false,false,null);
//接收消息
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者【1】接收消息:"+new String(body));
}
});
System.in.read();
RabbitMQUtils.closeChannelAndConnection(channel,connection);
}
}
创建消费者2
package b_workqueue;
import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
//消费者
public class Consumer2 {
@Test
public void receiveMessage() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("hello",false,false,false,null);
//接收消息
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者【2】接收消息:"+new String(body));
}
});
System.in.read();
RabbitMQUtils.closeChannelAndConnection(channel,connection);
}
}
测试
注意测试的时候要先运行消费者然后再运行生产者,消费者平均消费
消息的自动确认机制
当消费者1的消费能力有限时,此时消费者就不是平均消费,消费者1消费不完的,可由消费者2消费
广播
创建生产者package c_fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;
//生产者
public class Proudcer {
@Test
public void senMessage() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//设置交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
//向交换机发消息
channel.basicPublish("logs","",null,"我是一个fanout类型的消息".getBytes());
//关闭
RabbitMQUtils.closeChannelAndConnection(channel,connection);
System.out.println("消息发送成功");
}
}
创建消费者1
package c_fanout;
import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
//消费者1
public class Consumer1 {
@Test
public void Message() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//设置交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
//从通道里面得到一个临时队列
String queue = channel.queueDeclare().getQueue();
//把临时队列和交换机进行绑定
channel.queueBind(queue,"logs","");
//接受消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者【1】接收到消息是:"+new String(body));
}
});
System.out.println("消费者【1】启动成功");
System.in.read();
}
}
创建消费者2
package c_fanout;
import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
//消费者2
public class Consumer2 {
@Test
public void Message() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//设置交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
//从通道里面得到一个临时队列
String queue = channel.queueDeclare().getQueue();
//把临时队列和交换机进行绑定
channel.queueBind(queue,"logs","");
//接受消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者【2】接收到消息是:"+new String(body));
}
});
System.out.println("消费者【2】启动成功");
System.in.read();
}
}
测试
路由-直连
创建生产者package d_routing_direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;
//生产者
public class Proudcer {
@Test
public void senMessage() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//设置交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT);
//向交换机发消息 发送了四条消息,分别指定了路由key
channel.basicPublish("logs","info",null,"我是一个routingkey-Direct类型的消息-info".getBytes());
channel.basicPublish("logs","warn",null,"我是一个routingkey-Direct类型的消息-warn".getBytes());
channel.basicPublish("logs","debug",null,"我是一个routingkey-Direct类型的消息-debug".getBytes());
channel.basicPublish("logs","error",null,"我是一个routingkey-Direct类型的消息-error".getBytes());
//关闭
RabbitMQUtils.closeChannelAndConnection(channel,connection);
System.out.println("消息发送成功");
}
}
创建消费者1
package d_routing_direct;
import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
//消费者1
public class Consumer1 {
@Test
public void Message() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//设置交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT);
//从通道里面得到一个临时队列
String queue = channel.queueDeclare().getQueue();
//把临时队列和交换机进行绑定
channel.queueBind(queue,"logs","info");
channel.queueBind(queue,"logs","warn");
channel.queueBind(queue,"logs","debug");
//接受消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者【info】接收到消息是:"+new String(body));
}
});
System.out.println("消费者【info】启动成功");
System.in.read();
}
}
创建消费者2
package d_routing_direct;
import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
//消费者2
public class Consumer2 {
@Test
public void Message() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//设置交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT);
//从通道里面得到一个临时队列
String queue = channel.queueDeclare().getQueue();
//把临时队列和交换机进行绑定
channel.queueBind(queue,"logs","error");
//接受消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者【error】接收到消息是:"+new String(body));
}
});
System.out.println("消费者【error】启动成功");
System.in.read();
}
}
测试
.*匹配一个
.#匹配0个或多个
package e_routing_topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;
//生产者
public class Proudcer {
@Test
public void senMessage() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//设置交换机
channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
//向交换机发消息 发送了四条消息,分别指定了路由key
channel.basicPublish("topic","user.insert",null,"我是一个routingkey-Topic类型的消息-user.insert".getBytes());
channel.basicPublish("topic","user.insert.a",null,"我是一个routingkey-Topic类型的消息-user.insert.a".getBytes());
//关闭
RabbitMQUtils.closeChannelAndConnection(channel,connection);
System.out.println("消息发送成功");
}
}
消费者1
package e_routing_topic;
import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
//消费者1
public class Consumer1 {
@Test
public void Message() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//设置交换机
channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
//从通道里面得到一个临时队列
String queue = channel.queueDeclare().getQueue();
//把临时队列和交换机进行绑定
channel.queueBind(queue,"topic","user.*");
//接受消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者【*】接收到消息是:"+new String(body));
}
});
System.out.println("消费者【*】启动成功");
System.in.read();
}
}
消费者2
package e_routing_topic;
import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
//消费者1
public class Consumer2 {
@Test
public void Message() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//设置交换机
channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
//从通道里面得到一个临时队列
String queue = channel.queueDeclare().getQueue();
//把临时队列和交换机进行绑定
channel.queueBind(queue,"topic","user.#");
//接受消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者【#】接收到消息是:"+new String(body));
}
});
System.out.println("消费者【#】启动成功");
System.in.read();
}
}
测试
两个spring boot项目
生产者测试类package com.sxt;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitmqSpringbootProducerApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
System.out.println(rabbitTemplate);
}
@Test
void testHello(){
rabbitTemplate.convertAndSend("hello","hello world");
System.out.println("消息发送成功");
}
}
yml配置文件
server:
port: 8001
spring:
application:
name: producer
rabbitmq:
host: 39.105.127.232
port: 5672
username: user
password: 123456
virtual-host: /v-sxt
编写生产者HelloConfig配置
package com.sxt.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HelloConfig {
//创建一个队列
@Bean
public Queue hello(){//ctrl + p 显示其他参数 这里可以和hello项目一样设置五个参数
Queue hello = new Queue("hello");
return hello;
}
}
消费者HelloConsumer
package com.sxt.consumer;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queuesToDeclare = {@Queue("hello")})
public class HelloConsumer {
@RabbitHandler
public void receive(String body){
System.out.println("消费者收到消息,内容为:"+body);
}
}
消费者与生产者的yml配置文件一样,先启动生产者再启动消费者
测试 集成spring boot-work 生产者测试类 @Test
void testWork(){
for (int i=1;i<=10;i++){
rabbitTemplate.convertAndSend("work","hello work--"+i);
}
System.out.println("消息全部发送成功");
}
生产者配置类WorkConfig
package com.sxt.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class WorkConfig {
//创建一个队列
@Bean
public Queue work(){//ctrl + p 显示其他参数 这里可以和hello项目一样设置五个参数
Queue work = new Queue("work");
return work;
}
}
消费者配置类WorkConsumer
package com.sxt.consumer;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = {@Queue("work")})
public void receive1(String body){
System.out.println("消费者[1]收到消息,内容为:"+body);
}
@RabbitListener(queuesToDeclare = {@Queue("work")})
public void receive2(String body){
System.out.println("消费者[2]收到消息,内容为:"+body);
}
}
测试
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)