
RabbitMQ本身并不提供延迟队列的功能,但是我们仍然可以使用RabbitMQ的 TTL(Time-To-Live) 和 DLX(Dead Letter Exchanges) 这两个扩展特性来实现延迟队列,实现消息的延迟消费和延迟重试的功能。
实现结果-
固定时间延迟消费
-
指定时间消费
package com.itdfq.delay.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.addresses}")
private String address;
@Value("${spring.rabbitmq.port}")
private String port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
//连接工厂
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(address + ":" + port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
//TODO 消息发送确认--回调
// connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
//RabbitAdmin类封装对RabbitMQ的管理 *** 作
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
//使用Template
@Bean
public RabbitTemplate newRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
//设置监听确认mq(交换器)接受到信息
rabbitTemplate.set/confirm/iCallback(/confirm/iCallback());
//添加监听 失败鉴定(路由没有收到)
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallback());
return rabbitTemplate;
}
/
@Bean
public Queue DelayQueue() {
Map params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", RabbitMqConstant.IMMEDIATE_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", RabbitMqConstant.IMMEDIATE_ROUTING_KEY);
// x-message-ttl 声明该队列死信可存活时间
params.put("x-message-ttl", RabbitMqConstant.DELAY_TIME);
return new Queue(RabbitMqConstant.DELAY_QUEUE, true, false, false, params);
}
设置立即消费监听
package com.itdfq.delay.message.listen;
import com.itdfq.delay.constant.RabbitMqConstant;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayListenConfig {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public DirectExchange Exchange() {
DirectExchange exchange = new DirectExchange(
RabbitMqConstant.IMMEDIATE_EXCHANGE, true, false);
exchange.setAdminsThatShouldDeclare(rabbitAdmin);
return exchange;
}
@Bean
public Queue Queue() {
Queue queue = new Queue(RabbitMqConstant.IMMEDIATE_QUEUE, true, false, false);
queue.setAdminsThatShouldDeclare(rabbitAdmin);
return queue;
}
@Bean
public Binding subscribeNotifyBinding() {
Binding binding = BindingBuilder.bind(Queue()).to(Exchange())
.with(RabbitMqConstant.IMMEDIATE_ROUTING_KEY);
binding.setAdminsThatShouldDeclare(rabbitAdmin);
return binding;
}
@Bean
public SimpleMessageListenerContainer container(
@Qualifier(value = "delayRabbitmqListener") DelayRabbitmqListener delayRabbitmqListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(Queue());
container.setMessageListener(delayRabbitmqListener);
container.setDefaultRequeueRejected(false);
//手动提交
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置消费者ack消息的模式,默认是自动,此处设置为手动
// container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
}
指定时间(延时消费) 设置延时队列
@Bean
public Queue variableDelayQueue() {
Map params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", RabbitMqConstant.IMMEDIATE_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", RabbitMqConstant.IMMEDIATE_ROUTING_KEY);
return new Queue(RabbitMqConstant.DELAY_VARIABLE_QUEUE_KEY, true, false, false, params);
}
消费者
public void send(String msg,Integer expiration){
rabbitTemplate.convertAndSend(RabbitMqConstant.DELAY_VARIABLE_EXCHANGE_KEY,
RabbitMqConstant.DELAY_VARIABLE_ROUTING_KEY, msg,
message -> {
log.info("可变延时消费发送消息: {}, and expiration in {}ms", msg, expiration);
message.getMessageProperties().setExpiration(expiration.toString());
return message;
});
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)