
- 概念介绍
- 生产者YML配置
- 生产者发送消息
- 消费者监听信息(Push方式的信息)
- 使用@RocketMQMessageListener
- 消费者监听信息(Pull方式的信息)
- 死信队列
示意图:
https://www.cnblogs.com/weifeng1463/p/12889300.html
生产者YML配置# 名称服务器和生产者组必须配置
rocketmq:
producer:
group: my-producer
name-server: 127.0.0.1:9876
RocketMQAutoConfiguration.java
@Bean(PRODUCER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultMQProducer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
// 名称服务器和生产者组必须配置
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();
DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
return producer;
}
生产者发送消息
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send1")
public void send1() {
// 直接发送消息
rocketMQTemplate.sendOneWay("my-topic", "直接发送信息");
// 发送有序消息,第三个参数hashKey必保持一致,才能分配到一个队列中 消费者@RocketMQMessageListener(consumeMode = ConsumeMode.ORDERLY)才能有序消费
rocketMQTemplate.sendOneWayOrderly("my-topic", "有序信息", "1");
System.out.println("直接发送信息");
}
@GetMapping("/send2")
public void send2() {
for (int i = 0; i < 5; i++) {
rocketMQTemplate.syncSend("my-topic", "同步发送信息");
// 发送延迟消息
rocketMQTemplate.syncSend("my-topic", MessageBuilder.withPayload("同步发送信息" + i).build(), 1000, 3);
}
}
@GetMapping("/send3")
public void send3() {
rocketMQTemplate.asyncSend("my-topic", "异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}
}
消费者监听信息(Push方式的信息)
– 被动的收取发送过来的信息
使用@RocketMQMessageListener@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@documented
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLAqCEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
String consumerGroup();
String topic();
SelectorType selectorType() default SelectorType.TAG;
String selectorexpression() default "*";
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
MessageModel messageModel() default MessageModel.CLUSTERING;
int consumeThreadMax() default 64;
long consumeTimeout() default 15L;
String accessKey() default ACCESS_KEY_PLACEHOLDER;
String secretKey() default SECRET_KEY_PLACEHOLDER;
boolean enableMsgTrace() default true;
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
String nameServer() default NAME_SERVER_PLACEHOLDER;
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
注解类通过实现RocketMQListener接口重写onMessage方法收取消息(详情看下方例子)
ListenerContainerConfiguration.java
广播模式下消息不能顺序消费
private void validate(RocketMQMessageListener annotation) {
if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
annotation.messageModel() == MessageModel.BROADCASTING) {
throw new BeanDefinitionValidationException(
"Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
}
}
消费者组和主题必须要配置,否则将会初始化失败
boolean listenerEnabled =
(boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
consumerGroup, topic);
return;
}
validate(annotation);
DefaultRocketMQListenerContainer.java
可通过实现RocketMQPushConsumerLifecycleListener或RocketMQPushConsumerLifecycleListener接口重写prepareStart方法配置消费者属性
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}
例子:(请不要在onMessage方法中捕捉任何异常,因为没有异常代表消费成功)
@Service @RocketMQMessageListener(consumerGroup = "my-consumer", topic = "my-topic") public class Producer1Service implements RocketMQListener消费者监听信息(Pull方式的信息), RocketMQPushConsumerLifecycleListener { private int a = 1; @Override public void onMessage(String message) { System.out.println("======================================"); System.out.println("第" + a + "次接受"); a++; System.out.println("======================================"); System.out.println("consumer1-1:" + message); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 设置消费者消息重试次数 consumer.setMaxReconsumeTimes(1); // 设置客户端实例名称 consumer.setInstanceName("Producer1"); } }
– 主动拉取信息
YML配置
# 必须配置名称服务器,消费者组和主题
rocketmq:
name-server: 127.0.0.1:9876
consumer:
group: my-consumer
topic: my-topic
RocketMQAutoConfiguration.java
@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
throws MQClientException {
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
// 必须配置名称服务器,消费者组和主题
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
String selectorexpression = consumerConfig.getSelectorexpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,groupName, topicName, messageModel, selectorType, selectorexpression, ak, sk, pullBatchSize);
return litePullConsumer;
}
例子:
@Service
public class Test {
private DefaultLitePullConsumer litePullConsumer;
@Autowired
public Test(DefaultLitePullConsumer litePullConsumer) throws MQClientException {
this.litePullConsumer = litePullConsumer;
System.out.println(litePullConsumer.getPullBatchSize());
// 开启
litePullConsumer.start();
this.test();
}
public void test() {
try {
while (true) {
// 拉取信息
List messageExts = litePullConsumer.poll();
messageExts.forEach(e -> {
try {
System.out.println(new String(e.getBody(), "utf-8"));
} catch (UnsupportedEncodingException unsupportedEncodingException) {
unsupportedEncodingException.printStackTrace();
}
});
}
} finally {
// 关闭
litePullConsumer.shutdown();
}
}
}
死信队列
超过最大重试次数,信息依然没有被消费,该信息会进入一个特殊的队列,即死信队列。
例子:
@Service @RocketMQMessageListener(consumerGroup = "my-consumer", topic = "%DLQ%my-consumer") public class Producer3Service implements RocketMQListener{ @Override public void onMessage(String message) { System.out.println("==================死信队列消费信息:" + message); } }
死信队列默认权限是2(只写),需要改成6(可读可写)。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)