注冊(cè)網(wǎng)站域名需要什么資料醫(yī)療器械排名優(yōu)化seo
路由
在上一教程中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志記錄系統(tǒng)。我們能夠向許多接收者廣播日志消息。
在本教程中,我們將向它添加一個(gè)特性-我們將使它能夠只訂閱消息的一個(gè)子集。例如,我們將只能將關(guān)鍵錯(cuò)誤消息定向到日志文件(以節(jié)省磁盤空間),同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息。
綁定
在前面的示例中,我們已經(jīng)在創(chuàng)建綁定。你可能會(huì)想起以下代碼:
err = ch.QueueBind(q.Name, // queue name"", // routing key"logs", // exchangefalse,nil)
綁定是交換器和隊(duì)列之間的關(guān)系。這可以簡(jiǎn)單地理解為:隊(duì)列對(duì)來(lái)自此交換器的消息感興趣。
綁定可以采用額外的routing_key
參數(shù)。為了避免與Channel.Publish
參數(shù)混淆,我們將其稱為binding key
。這是我們?nèi)绾问褂面I創(chuàng)建綁定的方法:
err = ch.QueueBind(q.Name, // queue name"black", // routing key"logs", // exchangefalse,nil)
綁定密鑰的含義取決于交換器的類型。我們以前使用的fanout交換器只是忽略了這個(gè)值。
直連交換器
我們上一個(gè)教程中的日志系統(tǒng)將所有消息廣播給所有消費(fèi)者。我們希望擴(kuò)展這一點(diǎn),允許根據(jù)消息的嚴(yán)重性過(guò)濾消息。例如,我們可能希望將日志消息寫入磁盤的腳本只接收嚴(yán)重錯(cuò)誤,而不會(huì)在warning或info日志消息上浪費(fèi)磁盤空間。
我們使用fanout
交換器,這并沒(méi)有給我們很大的靈活性——它只能進(jìn)行無(wú)腦廣播。
我們將使用direct
交換器。direct
交換器背后的路由算法很簡(jiǎn)單——消息進(jìn)入其binding key
與消息的routing key
完全匹配的隊(duì)列。
為了說(shuō)明這一點(diǎn),請(qǐng)考慮以下設(shè)置:
在此設(shè)置中,我們可以看到綁定了兩個(gè)隊(duì)列的direct
交換器X
。第一個(gè)隊(duì)列綁定鍵為orange
,第二個(gè)隊(duì)列綁定為兩個(gè),一個(gè)綁定鍵為black
,另一個(gè)為green
。
在這種設(shè)置中,使用orange
路由鍵發(fā)布到交換器的消息將被路由到隊(duì)列Q1
。路由鍵為black
或green
的消息將轉(zhuǎn)到Q2
。所有其他消息將被丟棄。
多重綁定
用相同的綁定鍵綁定多個(gè)隊(duì)列是完全合法的。在我們的示例中,我們可以使用綁定鍵black
在X
和Q1
之間添加綁定。在這種情況下,direct
交換器的行為將類似fanout
,并將消息廣播到所有匹配的隊(duì)列。帶有black
路由鍵的消息將同時(shí)傳遞給Q1
和Q2
。
發(fā)送日志
我們將在日志系統(tǒng)中使用這個(gè)模型。我們將發(fā)送消息到direct
交換器,而不是fanout
。我們將提供嚴(yán)重性(譯注:通常我們使用日志級(jí)別劃分日志信息的嚴(yán)重性)作為路由鍵。這樣,接收腳本將能夠選擇其想要接收的日志級(jí)別。讓我們首先關(guān)注發(fā)送日志。
與往常一樣,我們需要首先創(chuàng)建一個(gè)交換器:
err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments
)
我們已經(jīng)準(zhǔn)備好發(fā)送一條消息:
err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments
)
failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)
err = ch.Publish("logs_direct", // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),
})
為了簡(jiǎn)化問(wèn)題,我們假設(shè)“嚴(yán)重性”可以是“info”
、“warning”
、“error”
之一。
訂閱
接收消息的工作方式與上一教程一樣,但有一個(gè)例外——我們將為感興趣的每種嚴(yán)重性(日志級(jí)別)創(chuàng)建一個(gè)新的綁定。
q, err := ch.QueueDeclare("", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments
)
failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)
}
// 建立多個(gè)綁定關(guān)系
for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name, // queue names, // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")
}
完整示例
emit_log_direct.go
腳本的代碼:
package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs_direct", // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 3) || os.Args[2] == "" {s = "hello"} else {s = strings.Join(args[2:], " ")}return s
}func severityFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "info"} else {s = os.Args[1]}return s
}
receive_logs_direct.go
的代碼:
package mainimport ("log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)}for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name, // queue names, // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")}msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto ackfalse, // exclusivefalse, // no localfalse, // no waitnil, // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}
如果你只想將“warning”和“err”
(而不是“info”)級(jí)別的日志消息保存到文件中,只需打開控制臺(tái)并輸入:
go run receive_logs_direct.go warning error > logs_from_rabbit.log
如果你想在屏幕上查看所有日志消息,請(qǐng)打開一個(gè)新終端并執(zhí)行以下操作:
go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C
例如,要發(fā)出error
日志消息,只需輸入:
go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
源自:https://www.rabbitmq.com/getstarted.html