中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

江門(mén)住房城鄉(xiāng)建設(shè)廳網(wǎng)站列舉常見(jiàn)的網(wǎng)絡(luò)營(yíng)銷(xiāo)工具

江門(mén)住房城鄉(xiāng)建設(shè)廳網(wǎng)站,列舉常見(jiàn)的網(wǎng)絡(luò)營(yíng)銷(xiāo)工具,武漢網(wǎng)站建設(shè)公司027,安陸建設(shè)局網(wǎng)站前面分析完Broker啟動(dòng)會(huì)啟動(dòng)RemotingServer服務(wù)同時(shí)會(huì)注冊(cè)Processor處理器,接著分析Producer進(jìn)行消息的發(fā)送,當(dāng)Producer發(fā)送完消息后就得到Broker去接收Producer發(fā)送的消息了。 Producer發(fā)送給Broker消息時(shí)候,發(fā)送的請(qǐng)求code為SEND_MESSAGE(這…

前面分析完Broker啟動(dòng)會(huì)啟動(dòng)RemotingServer服務(wù)同時(shí)會(huì)注冊(cè)Processor處理器,接著分析Producer進(jìn)行消息的發(fā)送,當(dāng)Producer發(fā)送完消息后就得到Broker去接收Producer發(fā)送的消息了。
Producer發(fā)送給Broker消息時(shí)候,發(fā)送的請(qǐng)求code為SEND_MESSAGE(這里在上一章節(jié)有過(guò)分析),根據(jù)消息發(fā)送過(guò)來(lái)的Code,這時(shí)會(huì)調(diào)用NettyRemotingAbstract的processRequestCommand方法,該方法里面會(huì)根據(jù)消息傳輸?shù)腃ode來(lái)取出對(duì)應(yīng)的Processor,進(jìn)入Processor系列類(lèi)的SendMessageProcessor的asyncProcessRequest方法(前面這一部分之前都有過(guò)分析,接下來(lái)我們一起看看后面的操作,正好也將之前的知識(shí)串在一起更有利于理解和記憶)


public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消息重回隊(duì)列case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析消息頭SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 構(gòu)建上下文,并調(diào)用處理前鉤子函數(shù)mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// 判斷批量消息還是單條消息if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}
}

首先解析消息頭構(gòu)建上下文,處理消息發(fā)送前鉤子函數(shù),最后異步處理消息請(qǐng)求,如果是批量消息調(diào)用asyncSendBatchMessage方法,如果是單條消息調(diào)用asyncSendMessage方法。

處理單條消息 private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 準(zhǔn)備響應(yīng)命令對(duì)象final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);// 時(shí)間msgInner.setBornTimestamp(requestHeader.getBornTimestamp());// 遠(yuǎn)程地址msgInner.setBornHost(ctx.channel().remoteAddress());// 主機(jī)msgInner.setStoreHost(this.getStoreHost());// 重試次數(shù)msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();// ...省略CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 事務(wù)消息if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 事務(wù)消息的狀態(tài)(后面再分析)putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 消息存儲(chǔ)putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}// 生成結(jié)果返回return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

構(gòu)建MessageExtBrokerInner對(duì)象,設(shè)置相關(guān)屬性執(zhí)行asyncPutMessage方法存儲(chǔ)消息并將結(jié)果返回客戶(hù)端。

創(chuàng)建響應(yīng),驗(yàn)證以及自動(dòng)創(chuàng)建topic


// 準(zhǔn)備響應(yīng),驗(yàn)證以及自動(dòng)創(chuàng)建topic
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {// 準(zhǔn)備響應(yīng)final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);// 設(shè)置唯一idresponse.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);// 獲取broker處理請(qǐng)求服務(wù)的起始時(shí)間final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}response.setCode(-1);// 驗(yàn)證topic以及自動(dòng)創(chuàng)建邏輯super.msgCheck(ctx, requestHeader, response);return response;
}

this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()用來(lái)判斷是否支持自動(dòng)創(chuàng)建topic,根據(jù)權(quán)限來(lái)判斷如果是不支持自動(dòng)創(chuàng)建就將權(quán)限設(shè)置為可讀可寫(xiě)不可繼承,后面我們?nèi)ヅ袛嗍欠窨梢匀ダ^承,如果能繼承就說(shuō)明支持自動(dòng)創(chuàng)建,這是就會(huì)new一個(gè)TopicConfig,這樣就通過(guò)autoCreateTopicEnable自動(dòng)來(lái)控制是否能夠自動(dòng)創(chuàng)建topic,同時(shí)也會(huì)調(diào)用registerBrokerAll方法注冊(cè)到Broker路由信息里面,當(dāng)然官方建議我們還是不要開(kāi)啟這個(gè)配置因?yàn)樗鼪](méi)有做到壓力的分?jǐn)偂?/p>

存盤(pán) asyncPutMessage方法
根據(jù)topic查詢(xún)對(duì)應(yīng)的路由信息即broker。

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {   msg.setStoreTimestamp(System.currentTimeMillis());msg.setBodyCRC(UtilAll.crc32(msg.getBody()));AppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延遲消息轉(zhuǎn)到系統(tǒng)Topic(后面在分析)if (msg.getDelayTimeLevel() > 0) {// ...省略}}// 發(fā)送消息地址InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}// 存儲(chǔ)消息地址InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}// 更新消息大小PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();updateMaxMessageSize(putMessageThreadLocal);if (!multiDispatch.isMultiDispatchMsg(msg)) {PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());}PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 寫(xiě)入CommitLog文件前加鎖,保證文件操作并發(fā)安全putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {// 獲取最后一個(gè)mapperFileMappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;msg.setStoreTimestamp(beginLockTimestamp);// 如果不存在或者滿(mǎn)了就創(chuàng)建一個(gè)if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 實(shí)際寫(xiě)入CommitLog,在后面追加result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {// 添加成功直接breakcase PUT_OK:break;// 表示當(dāng)前文件存放不下,只保存了一部分case END_OF_FILE:unlockMappedFile = mappedFile;// 創(chuàng)建一個(gè)新的文件mappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}// 繼續(xù)追加進(jìn)去result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}// 鎖的時(shí)間elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {beginTimeInLock = 0;putMessageLock.unlock();}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());// 提交刷盤(pán)申請(qǐng)CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// 提交主從復(fù)制申請(qǐng)CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});}

首先它會(huì)去處理延時(shí)消息這里我不做過(guò)細(xì)的分析,后面針對(duì)各種消息在來(lái)具體分析,接著就將消息進(jìn)行編碼然后加鎖并寫(xiě)入消息以獲取最后文件進(jìn)行追加的方式來(lái)將消息內(nèi)存文件里面,最后進(jìn)行刷盤(pán)以及通知主從同步的操作。

http://m.risenshineclean.com/news/36651.html

相關(guān)文章:

  • 做五金建材這幾個(gè)網(wǎng)站手機(jī)百度高級(jí)搜索
  • 代做廣聯(lián)達(dá) 的網(wǎng)站淺議網(wǎng)絡(luò)營(yíng)銷(xiāo)論文
  • 鹽田區(qū)網(wǎng)站建設(shè)百度網(wǎng)站ip地址
  • 專(zhuān)門(mén)做資產(chǎn)負(fù)債表結(jié)構(gòu)分析的網(wǎng)站上海市人大常委會(huì)
  • 賓利棋牌在哪個(gè)網(wǎng)站做的廣告成都seo服務(wù)
  • 免費(fèi)的視頻api接口seo排名計(jì)費(fèi)系統(tǒng)
  • 網(wǎng)站中英文轉(zhuǎn)換怎么做軟文標(biāo)題
  • 佛山專(zhuān)業(yè)建設(shè)網(wǎng)站平臺(tái)營(yíng)銷(xiāo)策劃方案怎么寫(xiě)?
  • 網(wǎng)站備案接入商名稱(chēng)seo計(jì)費(fèi)系統(tǒng)開(kāi)發(fā)
  • 廣州白云做網(wǎng)站濟(jì)寧百度推廣電話(huà)
  • 成都網(wǎng)站制作公司成人速成班有哪些專(zhuān)業(yè)
  • 黃驊港開(kāi)發(fā)區(qū)谷歌seo是什么意思
  • 網(wǎng)站建設(shè)演講稿全網(wǎng)營(yíng)銷(xiāo)系統(tǒng)1700元真實(shí)嗎
  • 專(zhuān)門(mén)做化妝的招聘網(wǎng)站品牌營(yíng)銷(xiāo)策略四種類(lèi)型
  • 手機(jī)電影網(wǎng)站怎么做河池網(wǎng)站seo
  • 啟銘網(wǎng)站建設(shè)5118關(guān)鍵詞挖掘工具
  • 石家莊移動(dòng)端網(wǎng)站建設(shè)百度發(fā)布
  • 濰坊網(wǎng)站優(yōu)化sem和seo是什么意思
  • 網(wǎng)站怎么優(yōu)化推廣怎么搜索關(guān)鍵詞
  • 慈溪外貿(mào)公司網(wǎng)站優(yōu)化營(yíng)商環(huán)境的意義
  • 動(dòng)態(tài)網(wǎng)站建設(shè)04章在線(xiàn)測(cè)試關(guān)鍵詞排名優(yōu)化易下拉霸屏
  • 北京12345網(wǎng)上投訴平臺(tái)seo搜索引擎優(yōu)化報(bào)價(jià)
  • 免費(fèi)政府網(wǎng)站html模板百度首頁(yè)快速排名系統(tǒng)
  • 廣西建設(shè)廳網(wǎng)站是什么關(guān)鍵詞優(yōu)化需要從哪些方面開(kāi)展?
  • 宣武網(wǎng)站建設(shè)低價(jià)刷贊網(wǎng)站推廣
  • 網(wǎng)站域名綁定seo搜索優(yōu)化專(zhuān)員招聘
  • 免費(fèi)網(wǎng)絡(luò)翻外墻軟件寧波seo高級(jí)方法
  • 濰坊哪里能找到做網(wǎng)站的seo廣告平臺(tái)
  • 網(wǎng)站建設(shè)服務(wù)提供商搜索引擎排名
  • 網(wǎng)站做的好不好看什么廣州seo