
RocketMQ 使用
- 简单的使用
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.Charset;
public class BasicProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 创建一个Producer对象
DefaultMQProducer producer = new DefaultMQProducer("basic_producer1");
//设置producer建立连接的nameserv地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动producer对象
producer.start();
// 发送消息
// a. 准备消息
Message message = new Message();
String topic = "test_mq";
message.setTopic(topic);
// 向详细中放入数据
String data = "hello, rocket test send";
byte[] bytes = data.getBytes(Charset.forName("utf-8"));
message.setBody(bytes);
// b. 发送消息
SendResult send = producer.send(message);
System.out.println("发送状态: " + send.getSendStatus()
+ ", msgId = " + send.getMsgId());
}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.nio.charset.Charset;
import java.util.List;
public class BasicConsumer {
public static void main(String[] args) throws MQClientException {
// 定义Consumer对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("basic_consumer1");
// 设置所要连接的nameserv地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topic
consumer.subscribe("test_mq", "*");
// 设置消费消息的监听器
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
// 实现消息的消费逻辑
try {
// 1. 获取消息中的数据
MessageExt message = msgs.get(0);
byte[] body = message.getBody();
// 解析接收到的字节数据
String s = new String(body, 0, body.length, Charset.forName("utf-8"));
// 业务逻辑: 输出接收到的字符串
System.out.println("收到消息: msgId为" + message.getMsgId() + ":" + s);
} catch (Exception e) {
e.printStackTrace();
// 消费失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
延迟消息:
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
public class DelayProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String data = "test delay message";
Message message = new Message();
message.setTopic("test_delay");
message.setBody(data.getBytes("utf-8"));
// 延迟级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 级别 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
message.setDelayTimeLevel(2);
message.putUserProperty("startTime", System.currentTimeMillis() + "");
SendResult send = producer.send(message);
System.out.println("发送了: " + send.getMsgId() + ", 发送结果" + send.getSendStatus());
}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class DelayConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("test_delay", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
try {
MessageExt message = msgs.get(0);
String startTime = message.getUserProperty("startTime");
long sendTime = Long.parseLong(startTime);
System.out.println("message receive time span: " + (System.currentTimeMillis() - sendTime));
byte[] body = message.getBody();
String s = new String(body, 0, body.length, "utf-8");
//System.out.println("msgId: " + message.getMsgId() + ": " + s);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
- 使用文章参考
https://www.cnblogs.com/fangyuan303687320/p/5495481.html
https://www.cnblogs.com/SimpleWu/p/12112351.html
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)