一、介绍

本教程假设RabbitMQ已安装并运行在本机上的标准端口(5672)。如果你使用不同的主机、端口或凭据,则需要调整连接设置。如果你未安装RabbitMQ,可以浏览我上一篇文章Linux系统服务器安装RabbitMQ

RabbitMQ是一个消息代理:它接受并转发消息。你可以把它想象成一个邮局:当你把你想要邮寄的邮件放进一个邮箱时,你可以确定邮差先生或女士最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。

RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块——消息。

RabbitMQ和一般的消息传递都使用一些术语。

生产仅意味着发送。发送消息的程序是生产者: 队列是位于RabbitMQ内部的邮箱的名称。尽管消息通过RabbitMQ和你的应用程序流动,但它们只能存储在队列中。队列只受主机内存和磁盘限制的限制,实际上它是一个大的消息缓冲区。许多生产者可以向一个队列发送消息,而许多消费者可以尝试从一个队列接收数据。以下是我们表示队列的方式: 消费与接收具有相似的含义。消费者是一个主要等待接收消息的程序: 请注意,生产者,消费者和代理(broker)不必位于同一主机上。实际上,在大多数应用程序中它们不是。一个应用程序既可以是生产者,也可以是消费者。

二、Hello World

在本教程的这一部分中,我们将在Go中编写两个小程序:发送单个消息的生产者和接收消息并将其打印出来的消费者。我们将忽略Go-RabbitMQ API中的一些细节,只关注非常简单的事情,以便开始教程。这是一个消息传递版的“Hello World”。

在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列——RabbitMQ代表消费者保存的消息缓冲区。

RabbitMQ讲多种协议。本教程使用amqp0-9-1,这是一个开放的、通用的消息传递协议。RabbitMQ有许多不同语言的客户端。在本教程中,我们将使用Go amqp客户端。

首先,使用go get安装amqp

go get github.com/streadway/amqp

三、发送

我们将消息发布者(发送者)称为 send.go,将消息消费者(接收者)称为receive.go。发布者将连接到RabbitMQ,发送一条消息,然后退出。

在send.go中,我们需要首先导入库:

package main

import (

"log"

"github.com/streadway/amqp"

)

我们还需要一个辅助函数来检查每个amqp调用的返回值:

func failOnErrorSend(err error, msg string) {

if err != nil {

log.Fatalf("%s: %s", msg, err)

}

}

然后连接到RabbitMQ服务器

// 1. 尝试连接RabbitMQ,建立连接

// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

failOnErrorSend(err, "Failed to connect to RabbitMQ")

defer conn.Close()

其中,amqp.Dial 是用于建立连接到 RabbitMQ 的函数,它使用 AMQP 协议进行通信。该函数的参数是 RabbitMQ 服务器的连接字符串。

amqp://:@:/

username : RabbitMQ 用户的用户名。password: RabbitMQ 用户的密码。host: RabbitMQ服务器的主机名或IP地址。port: RabbitMQ 服务器的端口号。vhost:需要连接的虚拟主机。

连接抽象了socket连接,并为我们处理协议版本协商和认证等。接下来,我们创建一个通道,这是大多数用于完成任务的API所在的位置:

// 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。

ch, err := conn.Channel()

failOnErrorSend(err, "Failed to open a channel")

defer ch.Close()

要发送,我们必须声明要发送到的队列。然后我们可以将消息发布到队列:

// 3. 声明消息要发送到的队列

q, err := ch.QueueDeclare(

"hello", // 队列的名称

false, // 队列是否持久化

false, // 队列是否在最后一个消费者断开连接后自动删除

false, // 是否设置队列为独占模式

false, // 是否不等待服务器的响应

nil, // 队列的可选参数

)

failOnErrorSend(err, "Failed to declare a queue")

body := "Hello World!"

// 4.将消息发布到声明的队列

err = ch.Publish(

"", // 要发布消息的目标交换器的名称。

q.Name, // 消息的路由键。交换器根据路由键将消息路由到相应的队列。

false, // 是否要求至少一个队列接收该消息

false, // 是否要求消息在发布时立即被消费者接收

amqp.Publishing {

ContentType: "text/plain",

Body: []byte(body),

})//要发布的消息内容

failOnErrorSend(err, "Failed to publish a message")

声明队列是幂等的——仅当队列不存在时才创建。消息内容是一个字节数组,因此你可以在此处编码任何内容。

完整代码:

package main

import (

"github.com/streadway/amqp"

"log"

)

func failOnErrorSend(err error, msg string) {

if err != nil {

log.Fatalf("%s: %s", msg, err)

}

}

func main() {

//1.连接RabbitMQ,建立连接

//该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。

conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")

failOnErrorSend(err, "Failed to connect to RabitMQ")

defer conn.Close()

//2.接下来,我们创建一个通道,这是大多数用户完成任务的API所在的位置:

ch, err := conn.Channel()

failOnErrorSend(err, "Fail to open a channel")

defer ch.Close()

//3.声明消息要发送的队列

q, err := ch.QueueDeclare(

"hello", //name

false, //durable

false, //delete when unused

false, //exclusive

false, //no-wait

nil, //arguments

)

failOnErrorSend(err, "Failed to declare a queue")

body := "Hello World!"

//4.将消息发布到声明的队列

err = ch.Publish(

"", //exchange

q.Name, //routing key

false, //mandatory

false, //immediate

amqp.Publishing{

ContentType: "text/plain",

Body: []byte(body),

},

)

failOnErrorSend(err, "Failed to publish a message")

log.Printf(" [x] Sent %s\n", body)

}

四、发送

上面是我们的发布者。我们的消费者监听来自RabbitMQ的消息,因此与发布单个消息的发布者不同,我们将使消费者保持运行状态以监听消息并打印出来。 该代码(在receive.go中)具有与send相同的导入和帮助功能:

import (

"github.com/streadway/amqp"

"log"

)

func failOnErrorReceive(err error, msg string) {

if err != nil {

log.Fatalf("%s: %s", msg, err)

}

}

设置与发布者相同;我们打开一个连接和一个通道,并声明要消耗的队列。请注意,这与send发布到的队列匹配。

//建立连接

conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")

failOnErrorReceive(err, "Failed to connect to RabbitMQ")

defer conn.Close()

//获取channel

ch, err := conn.Channel()

failOnErrorReceive(err, "Failed to open a channel")

defer ch.Close()

//声明队列

q, err := ch.QueueDeclare(

"hello", //name

false, //durable

false, //delete when unused

false, //exclusive

false, //no-wait

nil, //argument

)

failOnErrorReceive(err, "Failed to declare a queue")

failOnError(err, "Failed to declare a queue")

请注意,我们也在这里声明队列。因为我们可能在发布者之前启动使用者,所以我们希望在尝试使用队列中的消息之前确保队列存在。

我们将告诉服务器将队列中的消息传递给我们。由于它将异步地向我们发送消息,因此我们将在goroutine中从通道(由amqp::Consume返回)中读取消息。

//获取接收消息的Delivery通道

msgs, err := ch.Consume(

q.Name, //要消费消息的队列名称

"", //消费者标识,留空字符串会自动生成一个唯一标识符。

true, //是否自动确认消息

false, //是否设置队列为独占模式

false, //是否禁止消费者接收自己发布的消息

false, //是否不等待服务器的响应

nil, //队列的可选参数

)

failOnErrorReceive(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {

for d := range msgs {

log.Printf("Reveived a message:%s", d.Body)

}

}()

log.Printf("[*] Waiting for messages. To exit press CTRL+C")

<-forever

五、运行

终端运行:

go run send.go

go run receive.go

源自:https://www.rabbitmq.com/getstarted.html

相关链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: