福建設(shè)備公司網(wǎng)站百度24小時(shí)人工電話(huà)
4.1 NameServer 的功能
????????NameServer 是整個(gè)消息隊(duì)列中的狀態(tài)服務(wù)器,集群的各個(gè)組件通過(guò)它來(lái)了解全局的信息 。 同時(shí),各個(gè)角色的機(jī)器都要定期向 NameServer 上報(bào)自己的狀態(tài),超時(shí)不上報(bào)的話(huà), NameServer 會(huì)認(rèn)為某個(gè)機(jī)器出故障不可用了,其他的組件會(huì)把這個(gè)機(jī)器從可用列表里移除。
????????NamServer 可以部署多個(gè),相互之間獨(dú)立,其他角色同時(shí)向多個(gè) NameServer機(jī)器上報(bào)狀態(tài)信息,從而達(dá)到熱備份的目的。 NameServer 本身是無(wú)狀態(tài)的,也就是說(shuō) NameServer 中的 Broker 、 Topic 等狀態(tài)信息不會(huì)持久存儲(chǔ),都是由各個(gè)角色定時(shí)上報(bào)并存儲(chǔ)到內(nèi)存中的(NameServer 支持配置參數(shù)的持久化, 一般用不到) 。
?????????4.1.1 集群狀態(tài)的存儲(chǔ)結(jié)構(gòu)?
????????org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager有五個(gè)變量RouteInfoManager.javahttps://gitee.com/apache/rocketmq/blob/develop/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
/*** 這個(gè)結(jié)構(gòu)的 Key 是 Topic 的名稱(chēng),它存儲(chǔ)了所有 Topic的屬性信息。* Value 是個(gè) QueueData 隊(duì)列 , 隊(duì)里的長(zhǎng)度等于這個(gè) Topic數(shù)據(jù)存儲(chǔ)的* Master Broker 的個(gè)數(shù), QueueData 里存儲(chǔ)著 Broker 的名稱(chēng)、讀寫(xiě)* queue 的數(shù)量 、 同步標(biāo)識(shí)等*/private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;/*** 以 BrokerName 為索引 ,相同名稱(chēng)的 Broker 可能存在多臺(tái)機(jī)器, 一個(gè)* Master 和多個(gè) Slave 。 這個(gè)結(jié)構(gòu)存儲(chǔ)著一個(gè) BrokerName 對(duì)應(yīng)的屬性信* 息,包括所屬的 Cluster 名稱(chēng), 一個(gè) Master Broker 和多個(gè) Slave Broker * 的地址信息 。*/private final Map<String/* brokerName */, BrokerData> brokerAddrTable;/*** 存儲(chǔ)的是集群中 Cluster 的信息,結(jié)果很簡(jiǎn)單,就是一個(gè) Cluster 名稱(chēng)對(duì)* 應(yīng)一個(gè)由 BrokerName 組成的集合。*/private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;/*** 這個(gè)結(jié)構(gòu)和 BrokerAddrTable 有關(guān)系,但是內(nèi)容完全不同,這個(gè)結(jié)構(gòu)的* Key 是 BrokerAddr ,也就是對(duì)應(yīng)著一臺(tái)機(jī)器, BrokerAddrTable 中的 Key* 是 BrokerName , 多個(gè)機(jī)器的 BrokerName 可以相同 。 BrokerLiveTable* 存儲(chǔ)的內(nèi)容是這臺(tái) Broker 機(jī)器的實(shí)時(shí)狀態(tài),包括上次更新?tīng)顟B(tài)的時(shí)間戳, * NameServer 會(huì)定期檢查這個(gè)時(shí)間戳 ,超時(shí)沒(méi)有更新就認(rèn)為這個(gè) Broker 無(wú)效了,* 將其從 Broker 列表里清除。*/private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;/*** FilterServer 是過(guò)濾服務(wù)器,是 RocketMQ 的一種服務(wù)端過(guò)濾方式,一* 個(gè) Broker 可以有 一個(gè)或多個(gè) FilterServer 。 這個(gè)結(jié)構(gòu)的 Key 是 Broker* 的地址, Value 是和這個(gè) Broker 關(guān)聯(lián)的多個(gè) FilterServer 的地址。*/private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
????????4.1.2 狀態(tài)維護(hù)邏輯
????????因?yàn)槠渌巧珪?huì)主動(dòng)向Name Server 上報(bào)狀態(tài),所以 NameServer 的主 要邏 輯在 DefaultRequest -Processor 類(lèi)中,根據(jù)上報(bào)消息里的請(qǐng)求碼做相 應(yīng) 的處理, 更新存儲(chǔ)的對(duì)應(yīng)信息 。 此外,連接斷開(kāi)的 事 件也 會(huì) 觸發(fā)狀態(tài) 更新,具體邏輯在org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService
????????當(dāng) NameServer 和 Broker 的長(zhǎng)連接斷掉以后, onChannelDestroy 函數(shù)會(huì)被調(diào)用,把這個(gè) Broker 的信息清理出去 。
????????NameServer 還有定時(shí)檢查時(shí)間戳的邏輯 , Broker 向 NameServer 發(fā)送的心跳會(huì)更新時(shí)間戳, 當(dāng) NameServer 檢查 到時(shí)間戳長(zhǎng)時(shí)間沒(méi)有更新后,便會(huì)觸發(fā)清理邏輯 。
org.apache.rocketmq.namesrv.NamesrvController 每5s檢查一次
?4.2 各個(gè)角色間的交互流程
????????4.2.1 交互流程源碼分析
????????創(chuàng)建 Topic 的代碼 是 在? org.apache.rocketmq.tools.command.topic?里的UpdateTopicSubCommand 類(lèi)中,創(chuàng)建 Topic 的命令 是?updateTopic
UpdateTopicSubCommand.javahttps://gitee.com/apache/rocketmq/blob/develop/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
?????????org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute 調(diào)用org.apache.rocketmq.tools.admin.DefaultMQAdminExt#createAndUpdateTopicConfig
是向 NameServer 發(fā) 送注 冊(cè) 信 息, NameServer 完 成創(chuàng) 建Topic 的邏輯后,其他客戶(hù)端才能發(fā)現(xiàn)新增 的 Topic ,相關(guān)邏輯在org.apache.rocketmq.namesrv. routeinfo.RouteInfoManager#registerBroker,首先更新 Broker 信息,然后對(duì)每個(gè) Master 角色的 Broker ,創(chuàng)建一個(gè)QueueData 對(duì)象。 如果是新建 Topic ,就是添加 QueueData 對(duì)象;如果是修改Topic ,就是把舊的 QueueData 刪除 , 加入新的 QueueData 。
????????4.2.2 為何不用 ZooKeeper
????????RocketMQ 的架構(gòu)設(shè)計(jì)決定了它不需要進(jìn)行 Master 選舉,用不到這些復(fù)雜的功能,只需要一個(gè)輕量級(jí)的元數(shù)據(jù)服務(wù)器就足夠了 。?5 張圖告訴你 RocketMQ 為什么不使用 Zookeeper 做注冊(cè)中心 - 騰訊云開(kāi)發(fā)者社區(qū)-騰訊云 (tencent.com)https://cloud.tencent.com/developer/article/2118883
4.3 底層通信機(jī)制?
????????4.3.1 Remoting 模塊
????????RocketMQ 的通信相關(guān)代碼在 Remoting 模塊里,先來(lái)看看主要類(lèi)結(jié)構(gòu)?
?
?NettyRemotingClient、NettyRemotingServer而 且 都繼承了NettyRemoting-Abstract 類(lèi) 。通過(guò)上面的封裝 , RocketMQ 各個(gè)模塊間的通信, 可以通過(guò)發(fā)送統(tǒng)一格式的自定義消息 ( RemotingCommand ) 來(lái)完成, 各個(gè)模塊間的通信實(shí)現(xiàn)簡(jiǎn)潔明了 。
????????比如 NameServer 模 塊中, NameServerController 有 一 個(gè) remotingServer 變量 , NameServer 在啟動(dòng)時(shí)初始化各個(gè)變量 , 然后啟 動(dòng) remotingServer 即可,剩下 NameServer 要 做的 是專(zhuān)心實(shí)現(xiàn)處理 RemotingCommand 的邏輯。?
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
?
????????4.3.2 協(xié)議設(shè)計(jì)和編解碼
????????RocketMQ 自己定義了一個(gè)通信協(xié)議,使得模塊間傳輸?shù)亩M(jìn)制消息和有意義 的內(nèi) 容之間互相轉(zhuǎn)換。??
?????????1 )第一部分是大端 4 個(gè)字節(jié)整數(shù),值等于第二、三、 四部分長(zhǎng)度的總和;
????????2 )第二部分是大端 4 個(gè)字節(jié)整數(shù),值等于第三部分的長(zhǎng)度;
????????3 )第三部分是通過(guò) Json 序列化的數(shù)據(jù);
? ? ? ? 4?)第四部分是通過(guò)應(yīng)用自定義二進(jìn)制序列化的數(shù)據(jù)。
????????消息的解碼過(guò)程在 RemotingCommand 的 decode 函數(shù)里org.apache.rocketmq.remoting.protocol.RemotingCommand#decode(io.netty.buffer.ByteBuf)
org.apache.rocketmq.remoting.protocol.RemotingCommand#encode
????????4.3.3 Netty 庫(kù)?
????????RocketMQ 是基于 Netty 庫(kù)來(lái)完成 RemotingServer 和 RemotingClient 具體的通信實(shí)現(xiàn)的, Netty 是個(gè)事件驅(qū)動(dòng)的網(wǎng)絡(luò)編程框架,它屏蔽了 Java Socket 、 NIO等復(fù)雜細(xì)節(jié),用戶(hù)只需用好 Netty ,就可以實(shí)現(xiàn)一個(gè)“ 網(wǎng)絡(luò)編程專(zhuān)家+并發(fā)編程專(zhuān)家”水平的 Server 、 Client 網(wǎng)絡(luò)程序 。 應(yīng)用 Netty 有一定的門(mén)檻,需要了解它的 EventLoopGroup 、 Channel 、 Handler 模型以及各種具體的配置。 RocketMQ利用 Netty 實(shí)現(xiàn)的通信類(lèi)是 NettyRemotingServer 和 NettyRemotingClient ,用戶(hù)也可以參考這兩個(gè)類(lèi)的實(shí)現(xiàn)來(lái)學(xué)習(xí)使用 Netty 。