全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十八)延时队列

全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十八)延时队列,第1张

全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十八)延时队列

参看源码:

org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#messageTimeup

private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());

    TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
    long tagsCodevalue =
        MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
    msgInner.setTagsCode(tagsCodevalue);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

    msgInner.setSysFlag(msgExt.getSysFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(msgExt.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());

    msgInner.setWaitStoreMsgOK(false);
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

    msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));

    String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
    int queueId = Integer.parseInt(queueIdStr);
    msgInner.setQueueId(queueId);

    return msgInner;
}

第五步:将消息投递目标Topic中

这一步与第二步类似,不过由于消息的Topic名称已经改为了目标Topic。因此消息会直接投递到目标Topic的ConsumeQueue中,之后消费者即消费到这条消息。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5715525.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-18

发表评论

登录后才能评论

评论列表(0条)

    保存