小語種網(wǎng)站寧德seo培訓(xùn)
RocketMQ 延遲消息
RocketMQ 消費(fèi)者啟動(dòng)流程
什么是延遲消息
RocketMQ 延遲消息是指,生產(chǎn)者發(fā)送消息給消費(fèi)者消息,消費(fèi)者需要等待一段時(shí)間后才能消費(fèi)到。
使用場景
用戶下單之后,15分鐘未支付,對支付賬單進(jìn)行提醒或者關(guān)單處理。
RocketMQ 開源版本的消息不支持任意時(shí)間精度,只支持5s 10s 1m
等等。
Broker 如何處理延遲消息
消息投遞如下:
- 生產(chǎn)者發(fā)送一個(gè)延遲消息到一個(gè)
topic
中 - Broker 判斷是個(gè)延遲消息后,將消息暫存
- Broker 通過延遲服務(wù), 先檢查消息是否過期,如果到期將消息投遞到目標(biāo)
topic
- 消費(fèi)者消費(fèi)
topic
中的投遞延遲消息。
開源RocketMQ 的消息不支持任意精度,默認(rèn)支持 18個(gè) level:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker 在啟動(dòng)的時(shí)候,會(huì)創(chuàng)建一個(gè)內(nèi)部 topic:“SCHEDULE_TOPIC_XXXX” 根據(jù)延遲 level 數(shù)量,創(chuàng)建對應(yīng)數(shù)量的 隊(duì)列。 也就是說 18 level 對應(yīng)了18 個(gè)隊(duì)列。
具體可以在 代碼TopicConfigManager.java 中 看到:
private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;
要注意的是,Broker 一般是集群模式
部署,也就是說,每個(gè)Broker 都會(huì)有18個(gè)隊(duì)列。
TopicConfigManager#TopicConfigManager(BrokerController brokerController)
生產(chǎn)者消息延遲發(fā)送
代碼示例如下:
Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設(shè)置延遲level為5,對應(yīng)延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);
Broker 存儲(chǔ)延遲消息
上一篇文章已經(jīng)談到,Broker
收到消費(fèi)者消息后,會(huì)進(jìn)行消息存儲(chǔ),然后再轉(zhuǎn)發(fā)到消費(fèi)隊(duì)列(ConsumerQueue),然后再推給消費(fèi)者。
其實(shí)一旦消息轉(zhuǎn)發(fā)到
存儲(chǔ)延遲消息的流程也類似
- 確定延遲消息投遞到topic 哪個(gè)隊(duì)列。存儲(chǔ)生產(chǎn)者寫入的消息時(shí),將消息轉(zhuǎn)發(fā)到 ConsumeQueue 中,消費(fèi)者就能消費(fèi)到。 延遲消息不能立即消息到,于是將 topic 名稱修改為
SCHEDULE_TOPIC_XXX
,并根據(jù)延遲消息級別,確定投遞到哪個(gè)隊(duì)列上。同時(shí)還會(huì)將原來消息要發(fā)送到的目標(biāo) topic 和隊(duì)列記錄投遞到哪個(gè)隊(duì)列。
代碼在CommitLog#asyncPutMessage 中
設(shè)置延遲消息的投遞隊(duì)列信息代碼如下:
// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {// 如果設(shè)置的級別超過了最大級別,重置延遲級別if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
// 計(jì)算延遲消息應(yīng)該投遞到 SCHEDULE_TOPIC_XXXX 到哪個(gè)隊(duì)列。topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 記錄原始 topic ,queueid,方便后期投遞到目標(biāo) topicMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 更新消息投遞目標(biāo)為 SCHEDULE_TOPIC_XXX,queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}
消息轉(zhuǎn)發(fā)
消息轉(zhuǎn)發(fā)過程其實(shí)中會(huì)對延遲消息做一些特殊處理
CommitLog中的消息轉(zhuǎn)發(fā)到CosumeQueue中是異步進(jìn)行的。在轉(zhuǎn)發(fā)過程中,會(huì)對延遲消息進(jìn)行特殊處理,主要是計(jì)算這條延遲消息需要在什么時(shí)候進(jìn)行投遞。