網(wǎng)站建設(shè)公司哪好建站優(yōu)化推廣
??一、介紹????
????????RabbitMQ消息傳遞模型的核心思想是:生產(chǎn)者生產(chǎn)的消息從不會(huì)直接發(fā)送到隊(duì)列。實(shí)際上,通常生產(chǎn)者甚至不知道這些消息傳遞到了哪些隊(duì)列中。
???????相反,生產(chǎn)者只能將消息發(fā)送到交換機(jī),交換機(jī)工作的內(nèi)容非常簡單,一方面他接受來自生產(chǎn)者的消息,另一方面他將他們推入隊(duì)列。交換機(jī)必須確切知道如何處理收到的消息。是應(yīng)該把這些消息放到特定隊(duì)列還是說把他們放到許多隊(duì)列還是說應(yīng)該丟棄他們。這就由交換機(jī)來決定。
??二、類型
?????????1、類型
????????總共有以下類型:直接(direct)[路由],主題(topic),標(biāo)題(headers),扇出(fanout)[發(fā)布訂閱],
??????????默認(rèn)類型[無名類型]?通過("")進(jìn)行標(biāo)識(shí)
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
??????????第一個(gè)參數(shù)是交換機(jī)名稱,空字符串表示默認(rèn)或無名的交換機(jī);消息能路由發(fā)送到隊(duì)列中,其實(shí)是由routingKey(bindingKey)綁定key指定的,如果它存在的話。
??????2、?臨時(shí)隊(duì)列
???????每當(dāng)我們連接到Rabbit時(shí),我們都需要一個(gè)全新的空隊(duì)列,為此我們可以創(chuàng)建一個(gè)具有隨機(jī)名稱的隊(duì)列,或者能讓服務(wù)器為我們選擇一個(gè)隨機(jī)隊(duì)列名稱。其次一旦我們斷開了消費(fèi)者連接,隊(duì)列將被自動(dòng)刪除。
???????創(chuàng)建臨時(shí)隊(duì)列的方式如下
String queueName = channel.queueDeclare().getQueue();
?????3、綁定(bingings)
???????binding其實(shí)時(shí)exchange和queue之間的橋梁,他告訴我們exchange和哪個(gè)隊(duì)列進(jìn)行了綁定關(guān)系
????4、fanout
????????他是將接收到的所有消息廣播到他知道的所有隊(duì)列中
????????消費(fèi)者,另一個(gè)復(fù)制即可
public class ReceiveLogs01 {//交換機(jī)的名稱public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明一個(gè)交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//聲明一個(gè)隊(duì)列 臨時(shí)隊(duì)列/*** 隊(duì)列的名稱是隨機(jī)的* 當(dāng)消費(fèi)者斷開與隊(duì)列的連接的時(shí)候,隊(duì)列就自動(dòng)刪除*/String queueName = channel.queueDeclare().getQueue();/*** 綁定隊(duì)列與交換機(jī)*/channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接收消息,把接收到的消息打印在屏幕上......");//接收消息DeliverCallback deliverCallback = (consumerTag,message) ->{System.out.println("01控制臺(tái)打印接收到的消息:"+new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) ->{};channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}
}
???生產(chǎn)者
public class EmitLog {//交換機(jī)的名稱public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());System.out.println("生產(chǎn)者發(fā)出消息:"+message);}}
}
?????結(jié)果

???5、direct
??????消息只去到他綁定的routingKey隊(duì)列中,支持多重綁定,當(dāng)exchange的綁定類型是direct,但是他綁定的多個(gè)隊(duì)列的key如果都相同,在這種情況下雖然綁定類型是direct但是他表現(xiàn)的就和fanout有點(diǎn)類似了。
?????生產(chǎn)者
public class DirectLogs {//交換機(jī)的名稱public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes());System.out.println("生產(chǎn)者發(fā)出消息:"+message);}}
}
???消費(fèi)者1
public class ReceiveLogs01 {//交換機(jī)名稱public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明一個(gè)交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//聲明一個(gè)隊(duì)列channel.queueDeclare("console",false,false,false,null);//綁定隊(duì)列與交換機(jī)channel.queueBind("console",EXCHANGE_NAME,"info");channel.queueBind("console",EXCHANGE_NAME,"warning");DeliverCallback deliverCallback = (consumerTag,message) ->{System.out.println("direct01控制臺(tái)打印接收到的消息:"+new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) ->{};channel.basicConsume("console",true,deliverCallback,cancelCallback);}
}
??消費(fèi)者2
public class ReceiveLogs02 {//交換機(jī)名稱public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明一個(gè)交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//聲明一個(gè)隊(duì)列channel.queueDeclare("disk",false,false,false,null);//綁定隊(duì)列與交換機(jī)channel.queueBind("disk",EXCHANGE_NAME,"error");DeliverCallback deliverCallback = (consumerTag,message) ->{System.out.println("direct02控制臺(tái)打印接收到的消息:"+new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) ->{};channel.basicConsume("disk",true,deliverCallback,cancelCallback);}
}
??????5、topic
???????????發(fā)送到類型是topic交換機(jī)的消息的routing_key不能隨意寫,必須滿足一定的要求,它必須是一個(gè)單詞列表,以點(diǎn)號(hào)分隔開。這些單詞可以是任意單詞,比如說“stock.usd.nyse”,"nyse.vmw","quick.orange.rabbit"這種類型的。但是這個(gè)單詞列表最多不能超過255個(gè)字節(jié)?!??可以代替一個(gè)單詞;#可以替代零個(gè)或多個(gè)單詞】
??????????例如Q1->綁定的是orange帶三個(gè)單詞的字符串(*.orange.*)
?????????????????Q2->綁定的是最后一個(gè)是rabbit的3個(gè)單詞(*.*.rabbit)
??????????????????????????第一個(gè)單詞是lazy的多個(gè)單詞(lazy.#)
?????????當(dāng)一個(gè)隊(duì)列綁定鍵是#,那么這個(gè)隊(duì)列將接收所有數(shù)據(jù),有點(diǎn)像fanout;如果隊(duì)列綁定鍵當(dāng)中沒有#h和*出現(xiàn),那么該隊(duì)列綁定類型就是direct了。
????????生產(chǎn)者
public class EmitLogTopic {//交換機(jī)名稱public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();Map<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被隊(duì)列Q1Q2接收到");bindingKeyMap.put("lazy.orange.eleplant","被隊(duì)列Q1Q2接收到");bindingKeyMap.put("quick.orange.fox","被隊(duì)列Q1接收到");bindingKeyMap.put("lazy.brown.fox","被隊(duì)列Q2接收到");bindingKeyMap.put("lazy.pink.rabbit","雖然滿足兩個(gè)綁定但只被隊(duì)列Q2接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何綁定不會(huì)被任何隊(duì)列接收到會(huì)被丟棄");bindingKeyMap.put("quick.orange.male.rabbit","是四個(gè)單詞不匹配任何綁定會(huì)被丟棄");bindingKeyMap.put("lazy.orange.male.rabbit","是四個(gè)單詞但匹配Q2");for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());System.out.println("生產(chǎn)者發(fā)出消息:"+message);}}
}
??消費(fèi)者1
public class ReceiveLogsTopic01 {//交換機(jī)名稱public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//聲明隊(duì)列String queueName = "Q1";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");System.out.println("Q1等待接收消息。。。。。。");//接收消息channel.basicConsume(queueName,true,(consumeTag,message)->{System.out.println(new String(message.getBody()));System.out.println(" 接收隊(duì)列:"+queueName+" 綁定鍵:"+message.getEnvelope().getRoutingKey());},(message)->{});}
}
???消費(fèi)者2
public class ReceiveLogsTopic02 {//交換機(jī)名稱public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//聲明隊(duì)列String queueName = "Q2";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");System.out.println("Q2等待接收消息。。。。。。");//接收消息channel.basicConsume(queueName,true,(consumeTag,message)->{System.out.println(new String(message.getBody()));System.out.println(" 接收隊(duì)列:"+queueName+" 綁定鍵:"+message.getEnvelope().getRoutingKey());},(message)->{});}
}