中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

佛山網(wǎng)站建設(shè)是哪個好東莞谷歌推廣

佛山網(wǎng)站建設(shè)是哪個好,東莞谷歌推廣,福州論壇網(wǎng),民治營銷網(wǎng)站造成重復(fù)消費的原因: MQ向消費者推送message,消費者向MQ返回ack,告知所推送的消息消費成功。但是由于網(wǎng)絡(luò)波動等原因,可能造成消費者向MQ返回的ack丟失。MQ長時間(一分鐘)收不到ack,于是會向消…

造成重復(fù)消費的原因:

MQ向消費者推送message,消費者向MQ返回ack,告知所推送的消息消費成功。但是由于網(wǎng)絡(luò)波動等原因,可能造成消費者向MQ返回的ack丟失。MQ長時間(一分鐘)收不到ack,于是會向消費者再次推送該條message,這樣就造成了重復(fù)消費。

解決重復(fù)消費的辦法:

用存儲(redis或者mysql)記錄一下已經(jīng)消費的message的id,當(dāng)message被消費前先去存儲中查一下消費記錄,沒有該條message的id則正常消費返回ack,有該條message的id的話不用消費直接返回ack給MQ。

當(dāng)然實際生產(chǎn)中的話選用redis是比較好的選擇,畢竟查mysql要進行磁盤IO,效率要低得多,而且絕大多數(shù)重復(fù)消費都是由于MQ沒有收到消費者的ack于是造成MQ再次向消費者進行同一條message的投遞。所以message的消費記錄其實我們并不需要一直記錄,只需要保存一段時間,當(dāng)下次投遞過來的時候消費者能查到消費記錄然后準(zhǔn)確返回ack給MQ就行。

yml

#配置rabbitMq 服務(wù)器rabbitmq:host: xxxx#rabbitmq相關(guān)配置 15672是Web管理界面的端口;5672是MQ訪問的端口port: xxxxusername: xxxxpassword: xxxx#虛擬host 可以不設(shè)置,使用server默認hostvirtual-host: xxxxconnection-timeout: 0#確認消息已發(fā)送到隊列(Queue)publisher-returns: true #確認消息已發(fā)送到交換機(Exchange)publisher-confirm-type: correlated# 設(shè)置消費端手動 acklistener:simple:retry:# 開啟消費者(程序出現(xiàn)異常的情況下)進行重試enabled: true#重試間隔時間max-interval: 1000# 最大重試次數(shù)max-attempts: 3#開啟手動確認消息acknowledge-mode: manual

監(jiān)聽類
?

package com.rabbitmqprovider.service;import com.rabbitmq.client.Channel;
import com.rabbitmqprovider.commons.CommonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.io.IOException;/**** 防止重復(fù)消費*/
@Slf4j
@Service
public class TestBasicService {@Autowiredprivate StringRedisTemplate redisTemplate;/*** RabbitListener 可以寫在類、方法上* @param channel* @param message* @throws IOException*///@RabbitListener(queues = {CommonUtils.queueStr})@RabbitHandlerpublic void getMessage(Channel channel, Message message) throws IOException {try{String messageId= message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"UTF-8");//判斷messageId在redis中是否存在boolean flage=stringRedisTemplate(messageId,msg);if(!flage){log.error("消息已重復(fù)處理失敗,拒絕再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息}else{//如果要防止 重復(fù)消費,則需要將 id值存在 redis,每次 都要去redis中拿id比對,是否存在,存在則消費過->messageIdchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("接收到的消息{}->"+redisTemplate.opsForValue().get(messageId));}}catch (Exception e){if (message.getMessageProperties().getRedelivered()) {log.error("消息已重復(fù)處理失敗,拒絕再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息} else {log.error("消息即將再次返回隊列處理...");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}/*** 判斷Key是否存在* @param messageId 唯一表示key* @param msg ? ? ? value值* @return*/private boolean stringRedisTemplate(String messageId,String msg){log.info("messageId="+messageId);//判斷Key是否存在 有則返回true,沒有則返回falseif(redisTemplate.hasKey(messageId)){return false;}else{redisTemplate.opsForValue().setIfAbsent(messageId, msg);}return true;}
}

------------------------------------------controller--------------------------------------------------

/*** 解決重復(fù)消費問題*/
@GetMapping("/sendMessageTestOnly")
public void sendMessageTestOnly(){JSONObject jsonObject = new JSONObject();jsonObject.put("message","世界很大!");jsonObject.put("msg","你想去看看么?");String json = jsonObject.toJSONString();String messageId=UUID.randomUUID()+"";Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(messageId).build();rabbitTemplate.convertAndSend(CommonUtils.dirExchange,CommonUtils.routingKey,message,new CorrelationData(UUID.randomUUID().toString()));
}

---------------------------------回調(diào)------------------------------------------------------

package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 隊列防止消息丟失*/
@Slf4j
@Component
public class QueueCallback implements RabbitTemplate.ReturnCallback{@Overridepublic void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {log.info("消息 {} 經(jīng)交換機 {} 通過routingKey={} 路由到隊列失敗,失敗code為:{}, 失敗原因為:{}",new String(message.getBody()), exchange, routingKey, replyCode, replyText);}
}
package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 當(dāng)消息由生產(chǎn)者發(fā)到交換機后會回調(diào)該接口中的confirm方法*/
@Component
@Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{/* correlationData 內(nèi)含消息內(nèi)容* ack 交換機接受成功或者失敗。 true表示交換機接受消息成功, false表示交換機接受失敗* cause 表示失敗原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){log.info("交換機收到消息 消息內(nèi)容為{}->", correlationData);}else {log.info("交換機未收到消息消息內(nèi)容為{}, 原因為{}->", correlationData, cause);}}}

-----------------------------------------------------------------------------------------------------------------

執(zhí)行順序時:先發(fā)送消息;然后在接收消息,并判斷消息是否重復(fù),如果不重復(fù) 則回復(fù)消息,否則 拒絕回復(fù);最后回調(diào)。

http://m.risenshineclean.com/news/65664.html

相關(guān)文章:

  • html5網(wǎng)站導(dǎo)航品牌策略怎么寫
  • 做家教網(wǎng)站要多少錢google安卓手機下載
  • 河南省城鄉(xiāng)建設(shè)廳官網(wǎng)seo優(yōu)化中商品權(quán)重主要由什么決定
  • 網(wǎng)站設(shè)計流程包括百度指數(shù)app官方下載
  • 商標(biāo) 做網(wǎng)站 是幾類谷歌seo優(yōu)化
  • 做網(wǎng)站應(yīng)該怎么做行業(yè)數(shù)據(jù)統(tǒng)計網(wǎng)站
  • 石家莊哪有個人建站的網(wǎng)頁設(shè)計素材網(wǎng)站
  • 做童裝在哪個網(wǎng)站找客戶搜索引擎優(yōu)化seo方案
  • 推銷別人做網(wǎng)站有什么作用專業(yè)搜索引擎優(yōu)化電話
  • 南京整站優(yōu)化推廣和競價代運營
  • 做衣服接訂單的網(wǎng)站專業(yè)搜索引擎seo技術(shù)公司
  • 做網(wǎng)站有必要要源碼嗎營銷型網(wǎng)站有哪些功能
  • 給你一個網(wǎng)站你怎么做的嗎怎么查詢最新網(wǎng)站
  • 阿里云可以建網(wǎng)站嗎網(wǎng)絡(luò)宣傳方式有哪些
  • 網(wǎng)站和網(wǎng)頁的概念免費的網(wǎng)站域名查詢565wcc
  • 福田網(wǎng)站開發(fā)老王搜索引擎入口
  • 如何購買建設(shè)網(wǎng)站系統(tǒng)it培訓(xùn)班真的有用嗎
  • 怎樣做約票的網(wǎng)站意思電商營銷推廣方案
  • 建網(wǎng)站報價 優(yōu)幫云企業(yè)培訓(xùn)考試
  • 網(wǎng)站建設(shè)有哪些方法google seo 優(yōu)化招聘
  • 做網(wǎng)站域名服務(wù)器網(wǎng)站建設(shè)公司簡介
  • bt磁力搜索引擎win10優(yōu)化大師免費版
  • 網(wǎng)站 微信小程序怎么做百度指數(shù)搜索
  • 北龍中網(wǎng) 可信網(wǎng)站驗證 費用百度手機怎么刷排名多少錢
  • 招聘網(wǎng)站分析如何做網(wǎng)店推廣方案
  • 網(wǎng)站續(xù)費怎么做分錄百度seo關(guān)鍵詞排名優(yōu)化
  • 如何做視頻網(wǎng)站技術(shù)優(yōu)化大師軟件下載
  • 網(wǎng)站建設(shè)綿陽網(wǎng)絡(luò)營銷品牌案例
  • 專門做物理的網(wǎng)站網(wǎng)絡(luò)營銷的概念和特點是什么
  • 網(wǎng)站后臺登陸不進去是怎么回事網(wǎng)絡(luò)營銷手段有哪四種