RabbitMq生产者消费者代码

RabbitMq生产者消费者代码,第1张

生产者:
package com.atguigu.util; /**
 * Copyright (c)  牧原 All Rights Reserved
 * 

* Project: Producer * Package: com.atguigu.util * Version: 1.0 *

* Created by songquan on 2022/4/26 上午10:13 */ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.postgresql.core.QueryExecutor; import org.postgresql.util.HostSpec; import java.io.IOException; import java.sql.SQLException; import java.util.Properties; import java.util.concurrent.TimeoutException; /** * @topic: * @desc: * @author: songquan tel:18211850987 * @department:牧原食品-肉食总部数字化部-肉食总部大数据分析与应用科 * @datatime: 2022/4/26 上午10:13 */ public class Producer { //队列名称 public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.company.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.department.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.position.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.rank.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.employee.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.university.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.taginfo.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.district.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.bank.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.category.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.stordoc.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.measdoc.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.mattaxes.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.customer.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.material.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.bankaccbas.queue"; // public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.supplier.queue"; //发消息 public static void main(String[] args) throws IOException, TimeoutException { //创建一个连接工厂,该连接工厂其实就对应着我们访问http://182.92.210.39:15672/网站之后的rabbitmq,从这个工厂里可以获取队列 ConnectionFactory factory = new ConnectionFactory() { public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String s, String s1, Properties properties) throws SQLException { return null; } }; //工厂IP连接RabbitMQ的队列 factory.setHost("10.106.11.37"); //用户名 factory.setUsername("admin"); //密码 factory.setPassword("123456"); //创建连接 //Connection connection = factory.newConnection(); Connection connection = factory.newConnection(); //获取信道,通过这个信道可以连接交换机Exchange,然后再连接队列Queue Channel channel = (connection).createChannel(); /** * 生成一个队列,此队列中可以存放消息 * * 1.队列名称 * 2.队列里面的消息是否持久化到磁盘中 * 3.该消息队列是否共享,true表示多个消费者可访问此消息队列,false表示只有一个消费者可访问此消息队列 * 4.是否自动删除,最后一个消费者断开连接以后,该消息队列是否自动删除 * 5.其它参数 * */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //要发送到hello队列中的消息 // String message = "hello world"; String message = "{\"table_name\":\"md_company\", \"type\":\"insert\", \"time_now\":\"2022-05-09 11:20:10\", \"id\":\"2\", \"tenant_id\":\"2\", \"code\":\"2\", \"name\" :\"2\", \"short_name\":\"2\", \"bank_account\":\"2\", \"bank_name\":\"2\", \"tax_payer_no\":\"2\", \"legal_person\":\"2\", \"tel\":\"2\", \"established_date\":\"2\", \"business_license\":\"2\", \"is_region\":\"2\" , \"is_bm_payment\":\"2\", \"province_code\":\"2\", \"city_code\":\"2\" , \"address\":\"2\" , \"company_type_code\":\"2\", \"is_used\":\"2\", \"create_user\":\"2\", \"create_dept\":\"2\", \"create_time\":\"2022-05-09 11:20:10\", \"update_user\":\"2\", \"update_time\":\"2022-05-09 11:20:18\", \"status\":\"2\", \"is_deleted\":\"2\", \"edge_form_id\":\"2\" , \"data_type\":\"2\",\"exchange_status\":\"2\"}"; /** * 往队列中发送一个消息 * * 1.发送到哪个交换机 * 2.路由的Key值是哪个,本次是队列的名称 * 3.其它参数信息 * 4.发送消息的消息体,需要转换成Byte数组 * */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //如果消息成功的发送到了hello队列中,那么会输出这句代码 System.out.println("消息发送完毕"); } }

消费者:

package com.atguigu.util; /**
 * Copyright (c)  牧原 All Rights Reserved
 * 

* Project: Consumer * Package: com.atguigu.util * Version: 1.0 *

* Created by songquan on 2022/4/26 上午10:30 */ import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @topic: * @desc: * @author: songquan tel:18211850987 * @department:牧原食品-肉食总部数字化部-肉食总部大数据分析与应用科 * @datatime: 2022/4/26 上午10:30 */ public class Consumer { //消费者要获取哪个队列中的消息 public static final String QUEUE_NAME="data.etl.syn.rs.mdm.company.queue"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.106.11.37"); //用户名 factory.setUsername("admin"); //密码 factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //如果能成功接收到消息会调用的回调函数 DeliverCallback deliverCallback=(consumerTag, message)->{ System.out.println(new String(message.getBody())); }; //如果取消从消息队列中获取消息时会调用的回调函数 CancelCallback cancelCallback= consumerTag->{ System.out.println("消息消费被中断"); }; /** * 消费者消费消息,也即是消费者从消息队列中取消息 * * 1.消费哪个队列 * 2.消费成功之后是否要自动应答,true代表的是自动应答,false代表的是手动应答 * 3.消费者成功消费的回调 * 4.消费者取消消费的回调 * */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/905428.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-05-15
下一篇2022-05-15

发表评论

登录后才能评论

评论列表(0条)

    保存