石家莊網(wǎng)站建設(shè)解決方案seo網(wǎng)絡(luò)推廣優(yōu)勢(shì)
springboot整合rabbitmq死信隊(duì)列
什么是死信
說道死信,可能大部分觀眾大姥爺會(huì)有懵逼的想法,什么是死信?死信隊(duì)列,俗稱DLX,翻譯過來的名稱為Dead Letter Exchange 死信交換機(jī)。當(dāng)消息限定時(shí)間內(nèi)未被消費(fèi),成為 Dead Message后,可以被重新發(fā)送到另一個(gè)交換機(jī)中,發(fā)揮其應(yīng)有的價(jià)值!
需要測(cè)試死信隊(duì)列,則需要先梳理整體的思路,如可以采取如下方式進(jìn)行配置:
從上面的邏輯圖中,可以發(fā)現(xiàn)大致的思路:
.1. 消息隊(duì)列分為正常交換機(jī)、正常消息隊(duì)列;以及死信交換機(jī)和死信隊(duì)列。
2. 正常隊(duì)列針對(duì)死信信息,需要將數(shù)據(jù) 重新 發(fā)送至死信交換機(jī)中。
死信使用的場(chǎng)景
- 消息被拒絕
- 消息ttl過期
- 隊(duì)列達(dá)到最大長(zhǎng)度
這三種場(chǎng)景就會(huì)成為死信,然后放入死信交換機(jī)
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitmqConfig {//正常交換機(jī)的名字public final static String EXCHANGE\_NAME = "exchange\_name";//正常隊(duì)列的名字public final static String QUEUE\_NAME="queue\_name";//死信交換機(jī)的名字public final static String EXCHANGE\_DEAD = "exchange\_dead";//死信隊(duì)列的名字public final static String QUEUE\_DEAD="queue\_dead";//死信路由keypublic final static String DEAD\_KEY="dead.key";//創(chuàng)建正常交換機(jī)@Bean(EXCHANGE\_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)//持久化 mq重啟后數(shù)據(jù)還在.durable(true).build();}//創(chuàng)建正常隊(duì)列@Bean(QUEUE\_NAME)public Queue queue(){//正常隊(duì)列和死信進(jìn)行綁定 轉(zhuǎn)發(fā)到 死信隊(duì)列,配置參數(shù)Map<String,Object>map=getMap();return new Queue(QUEUE\_NAME,true,false,false,map);}//正常隊(duì)列綁定正常交換機(jī) 設(shè)置規(guī)則 執(zhí)行綁定 定義路由規(guī)則 requestmaping映射@Beanpublic Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,@Qualifier(EXCHANGE\_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange)//路由規(guī)則.with("app.#").noargs();}//創(chuàng)建死信隊(duì)列@Bean(QUEUE\_DEAD)public Queue queueDead(){return new Queue(QUEUE\_DEAD,true,false,false);}//創(chuàng)建死信交換機(jī)@Bean(EXCHANGE\_DEAD)public Exchange exchangeDead(){return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD).durable(true) //持久化 mq重啟后數(shù)據(jù)還在.build();}//綁定死信隊(duì)列和死信交換機(jī)@Beanpublic Binding deadBinding(){return BindingBuilder.bind(queueDead()).to(exchangeDead())//路由規(guī)則 正常路由key.with(DEAD\_KEY).noargs();}/\*\*獲取死信的配置信息\*\*\*/public Map<String,Object>getMap(){//3種方式 任選其一,選擇其他方式之前,先把交換機(jī)和隊(duì)列刪除了,在啟動(dòng)項(xiàng)目,否則報(bào)錯(cuò)。//方式一Map<String,Object> map=new HashMap<>(16);//死信交換器名稱,過期或被刪除(因隊(duì)列長(zhǎng)度超長(zhǎng)或因空間超出閾值)的消息可指定發(fā)送到該交換器中;map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);//死信消息路由鍵,在消息發(fā)送到死信交換器時(shí)會(huì)使用該路由鍵,如果不設(shè)置,則使用消息的原來的路由鍵值map.put("x-dead-letter-routing-key", DEAD\_KEY);//方式二//消息的過期時(shí)間,單位:毫秒;達(dá)到時(shí)間 放入死信隊(duì)列// map.put("x-message-ttl",5000);//方式三//隊(duì)列最大長(zhǎng)度,超過該最大值,則將從隊(duì)列頭部開始刪除消息;放入死信隊(duì)列一條數(shù)據(jù)// map.put("x-max-length",3);return map;}}
配置文件信息
spring:rabbitmq:host: 192.168.23.135port: 5672username: adminpassword: admin#虛擬主機(jī)virtual-host: dmg-alistener:simple:#自動(dòng)ackacknowledge-mode: autoretry:#最大重試次數(shù)max-attempts: 3#開啟重試enabled: true
引入 rabbitmq 依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生產(chǎn)者
@RestController
@RequestMapping("p")
public class TestController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/test")public String test(){//正常交換機(jī) 正常路由鍵 正常消息內(nèi)容rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE\_NAME,"app.test","我是生產(chǎn)者");return "aa";}
}
//消費(fèi)者
@Component
public class Xf {//監(jiān)聽正常隊(duì)列名稱@RabbitListener(queues = {RabbitmqConfig.QUEUE\_NAME})public void normal(String payload, Message message, Channel channel) throws IOException {System.out.println("正常消息:"+payload);long tag=message.getMessageProperties().getDeliveryTag();try{// int i=1/0;//手動(dòng)簽收channel.basicAck(tag,true);}catch (RuntimeException runtimeException){//出現(xiàn)異常 刪除消息 放入死信隊(duì)列channel.basicReject(tag,false);}}
監(jiān)聽死信隊(duì)列名稱
@RabbitListener(queues = {RabbitmqConfig.QUEUE\_DEAD})public void dead(String payload, Message message, Channel channel) throws IOException {System.out.println("死信隊(duì)列:"+payload);//刪除消息 放入數(shù)據(jù)庫(kù) 人工處理long deliveryTag=message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag,true);}
}