
本次需求需要同步两库中的company表。
业务代码略过。。。
rabbitmq创建交换机
绑定队列
创建队列
2.发布者代码
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void sendMessageCompany(JSONObject messageVo, String routingkey) {
// 交换机名称 绑定的routingkey 消息
rabbitTemplate.convertAndSend("company", "hello", messageVo);
}
3.消费者代码
初始化通道
package com.gold.mtmc.rabbitmq.config;
import com.gold.mtmc.common.contants.enums.RabbitMqEnum;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HelloExchangeConfig {
private String EXCHANGE = "hello";
private String QUEUE = "helloqueue";
private String RoutingKey = "message_hello";
@Bean
public DirectExchange helloExchange() {
DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false);
return directExchange;
}
@Bean
public Queue helloQueueMsg() {
Queue queue = new Queue(QUEUE, true, false, false);
return queue;
}
@Bean
public Binding helloBindingQueueMsg() {
Binding binding = BindingBuilder.bind(helloQueueMsg()).to(helloExchange()).with(RoutingKey);
return binding;
}
}
@Slf4j
@Component
public class DirectMsgListener {
@Autowired
private MsgListener msgListener;
//指定队列名称
@RabbitListener(queues = "fanout")
public void displayMailFanout(JSONObject messageVo, Channel channel, Message message) throws IOException {
//回调处理消息
try {
log.info("directMsg队列监听器收到消息开始:" + messageVo.toString());
//调用发送消息接口
msgListener.send(messageVo);
log.info("directMsg队列监听器收到消息结束:" + messageVo.toString());
} catch (Exception e) {
log.info("***********************************************发送消息失败:"+e.getMessage());
e.printStackTrace();
log.info("***********************************************发送消息失败:"+e);
} finally {
log.info("directMsg队列监听器收到消息结束:" + messageVo.toString() + message.getMessageProperties().getDeliveryTag());
//这段代码表示,这次消息,我已经接受并消费掉了,不会再重复发送消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)