「MQ」RabbitMQ路由模式讲解

「MQ」RabbitMQ路由模式讲解,第1张

「MQ」RabbitMQ路由模式讲解 RabbitMQ路由模式

相关视频参考(来自动力节点):https://www.bilibili.com/video/BV1Ap4y1D7tU

相关资料下载:http://www.bjpowernode.com/?csdn

生产者
package rabbitmq.routing;

import java.util.Random;

import java.util.Scanner;



import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;



public class Test1 {

public static void main(String[] args) throws Exception {

String[] a = {"warning", "info", "error"};



ConnectionFactory f = new ConnectionFactory();

f.setHost("192.168.64.140");

f.setPort(5672);

f.setUsername("admin");

f.setPassword("admin");



Connection c = f.newConnection();

Channel ch = c.createChannel();



//参数1: 交换机名

//参数2: 交换机类型

ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);



while (true) {

System.out.print("输入消息: ");

String msg = new Scanner(System.in).nextLine();

if ("exit".equals(msg)) {

break;

}



//随机产生日志级别

String level = a[new Random().nextInt(a.length)];



//参数1: 交换机名

//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"

//参数3: 其他配置属性

//参数4: 发布的消息数据

ch.basicPublish("direct_logs", level, null, msg.getBytes());

System.out.println("消息已发送: "+level+" - "+msg);



}



c.close();

}

}
消费者
package rabbitmq.routing;

import java.io.IOException;

import java.util.Scanner;



import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.client.Delivery;



public class Test2 {

public static void main(String[] args) throws Exception {

ConnectionFactory f = new ConnectionFactory();

f.setHost("192.168.64.140");

f.setUsername("admin");

f.setPassword("admin");

Connection c = f.newConnection();

Channel ch = c.createChannel();



//定义名字为 direct_logs 的交换机, 它的类型是 "direct"

ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);



//自动生成对列名,

//非持久,独占,自动删除

String queueName = ch.queueDeclare().getQueue();



System.out.println("输入接收的日志级别,用空格隔开:");

String[] a = new Scanner(System.in).nextLine().split("\s");



//把该队列,绑定到 direct_logs 交换机

//允许使用多个 bindingKey

for (String level : a) {

ch.queueBind(queueName, "direct_logs", level);

}



System.out.println("等待接收数据");



//收到消息后用来处理消息的回调对象

DeliverCallback callback = new DeliverCallback() {

@Override

public void handle(String consumerTag, Delivery message) throws IOException {

String msg = new String(message.getBody(), "UTF-8");

String routingKey = message.getEnvelope().getRoutingKey();

System.out.println("收到: "+routingKey+" - "+msg);

}

};



//消费者取消时的回调对象

CancelCallback cancel = new CancelCallback() {

@Override

public void handle(String consumerTag) throws IOException {

}

};



ch.basicConsume(queueName, true, callback, cancel);

}

}

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5678892.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存