佛山網(wǎng)站建設(shè)是哪個好東莞谷歌推廣
造成重復(fù)消費的原因:
MQ向消費者推送message,消費者向MQ返回ack,告知所推送的消息消費成功。但是由于網(wǎng)絡(luò)波動等原因,可能造成消費者向MQ返回的ack丟失。MQ長時間(一分鐘)收不到ack,于是會向消費者再次推送該條message,這樣就造成了重復(fù)消費。
解決重復(fù)消費的辦法:
用存儲(redis或者mysql)記錄一下已經(jīng)消費的message的id,當(dāng)message被消費前先去存儲中查一下消費記錄,沒有該條message的id則正常消費返回ack,有該條message的id的話不用消費直接返回ack給MQ。
當(dāng)然實際生產(chǎn)中的話選用redis是比較好的選擇,畢竟查mysql要進行磁盤IO,效率要低得多,而且絕大多數(shù)重復(fù)消費都是由于MQ沒有收到消費者的ack于是造成MQ再次向消費者進行同一條message的投遞。所以message的消費記錄其實我們并不需要一直記錄,只需要保存一段時間,當(dāng)下次投遞過來的時候消費者能查到消費記錄然后準(zhǔn)確返回ack給MQ就行。
yml
#配置rabbitMq 服務(wù)器rabbitmq:host: xxxx#rabbitmq相關(guān)配置 15672是Web管理界面的端口;5672是MQ訪問的端口port: xxxxusername: xxxxpassword: xxxx#虛擬host 可以不設(shè)置,使用server默認hostvirtual-host: xxxxconnection-timeout: 0#確認消息已發(fā)送到隊列(Queue)publisher-returns: true #確認消息已發(fā)送到交換機(Exchange)publisher-confirm-type: correlated# 設(shè)置消費端手動 acklistener:simple:retry:# 開啟消費者(程序出現(xiàn)異常的情況下)進行重試enabled: true#重試間隔時間max-interval: 1000# 最大重試次數(shù)max-attempts: 3#開啟手動確認消息acknowledge-mode: manual
監(jiān)聽類
?
package com.rabbitmqprovider.service;import com.rabbitmq.client.Channel; import com.rabbitmqprovider.commons.CommonUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service;import java.io.IOException;/**** 防止重復(fù)消費*/ @Slf4j @Service public class TestBasicService {@Autowiredprivate StringRedisTemplate redisTemplate;/*** RabbitListener 可以寫在類、方法上* @param channel* @param message* @throws IOException*///@RabbitListener(queues = {CommonUtils.queueStr})@RabbitHandlerpublic void getMessage(Channel channel, Message message) throws IOException {try{String messageId= message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"UTF-8");//判斷messageId在redis中是否存在boolean flage=stringRedisTemplate(messageId,msg);if(!flage){log.error("消息已重復(fù)處理失敗,拒絕再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息}else{//如果要防止 重復(fù)消費,則需要將 id值存在 redis,每次 都要去redis中拿id比對,是否存在,存在則消費過->messageIdchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("接收到的消息{}->"+redisTemplate.opsForValue().get(messageId));}}catch (Exception e){if (message.getMessageProperties().getRedelivered()) {log.error("消息已重復(fù)處理失敗,拒絕再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息} else {log.error("消息即將再次返回隊列處理...");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}/*** 判斷Key是否存在* @param messageId 唯一表示key* @param msg ? ? ? value值* @return*/private boolean stringRedisTemplate(String messageId,String msg){log.info("messageId="+messageId);//判斷Key是否存在 有則返回true,沒有則返回falseif(redisTemplate.hasKey(messageId)){return false;}else{redisTemplate.opsForValue().setIfAbsent(messageId, msg);}return true;} }
------------------------------------------controller--------------------------------------------------
/*** 解決重復(fù)消費問題*/ @GetMapping("/sendMessageTestOnly") public void sendMessageTestOnly(){JSONObject jsonObject = new JSONObject();jsonObject.put("message","世界很大!");jsonObject.put("msg","你想去看看么?");String json = jsonObject.toJSONString();String messageId=UUID.randomUUID()+"";Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(messageId).build();rabbitTemplate.convertAndSend(CommonUtils.dirExchange,CommonUtils.routingKey,message,new CorrelationData(UUID.randomUUID().toString())); }
---------------------------------回調(diào)------------------------------------------------------
package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;/*** 隊列防止消息丟失*/ @Slf4j @Component public class QueueCallback implements RabbitTemplate.ReturnCallback{@Overridepublic void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {log.info("消息 {} 經(jīng)交換機 {} 通過routingKey={} 路由到隊列失敗,失敗code為:{}, 失敗原因為:{}",new String(message.getBody()), exchange, routingKey, replyCode, replyText);} }
package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils;/*** 當(dāng)消息由生產(chǎn)者發(fā)到交換機后會回調(diào)該接口中的confirm方法*/ @Component @Slf4j public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{/* correlationData 內(nèi)含消息內(nèi)容* ack 交換機接受成功或者失敗。 true表示交換機接受消息成功, false表示交換機接受失敗* cause 表示失敗原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){log.info("交換機收到消息 消息內(nèi)容為{}->", correlationData);}else {log.info("交換機未收到消息消息內(nèi)容為{}, 原因為{}->", correlationData, cause);}}}
-----------------------------------------------------------------------------------------------------------------
執(zhí)行順序時:先發(fā)送消息;然后在接收消息,并判斷消息是否重復(fù),如果不重復(fù) 則回復(fù)消息,否則 拒絕回復(fù);最后回調(diào)。