如何使我的连接保持活动状态以使用 RabbitMQ streadway/amqp 发布消息?
How to keep my connection alive for publishing messages with RabbitMQ streadway/amqp?
由于每次打开连接进行发布的成本很高,我正在尝试实现某种方式来保持连接活动并在我的应用程序中共享它以发布消息。
var (
Connection *amqp.Connection
Channel *amqp.Channel
err error
)
func Connect() {
Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
FailOnError(err, "Failed to connect to RabbitMQ")
Channel, err = Connection.Channel()
FailOnError(err, "Failed to open a channel")
}
func CloseConnection() {
err = Channel.Close()
FailOnError(err, "Failed to close channel ")
err = Connection.Close()
FailOnError(err, "Failed to close connection ")
}
func KeepAlive() {
queue, err := Channel.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "couldn't publish tics")
tic := "tic"
for {
err := Channel.Publish(
"", // exchange
queue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(tic),
Expiration: "5000",
})
FailOnError(err, "couldn't publish tics")
time.Sleep(5 *time.Second)
}
}
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
函数 KeepAlive
是一个无限循环,每 5 秒发送一次虚拟消息,并且该消息的 TTL 也为 5 秒,因此它会被销毁。
func main() {
rabbitmq.Connect()
defer rabbitmq.CloseConnection()
go func() {
//publisher connection to stay alive as long as application is running
rabbitmq.KeepAlive()
}()
data_layer.OpenDBConnection()
router := gin.Default()
router.POST("/whatever", whatever)
err := router.Run()
if err != nil {
log.Fatal(err.Error())
}
}
在这里,我正在创建连接并在 goroutine 中调用 KeepAlive
,这样它就可以在后台工作,让我的连接始终保持活动状态。
我的问题:
我觉得这种方法只是一种变通方法,虽然我已经尝试搜索如何使其保持活力的示例,但似乎所有这些示例都对消费者方面感兴趣。有没有更简洁的方法来保持我的连接?
只要我的应用程序 运行 不好,我的连接就一直保持活动状态吗?性能明智(网络,内存使用)?注意:我打算用 Prometheus 监控它以观察性能,但任何关于我可能面临的问题的注释都会有所帮助
旁注:发送的这些抽动将被发送到一个虚拟队列,因为如果我将它发送到我的队列,我使用另一个服务的消息,它将卡在没有 TTL 和这些抽搐会变得非常大。
使用 streadway/amqp
你不需要自己实现 keepalive。图书馆已经提供了这种机制。
方法amqp.Dial
constructs a Connection
with a default heartbeat of 10 seconds. You can see the code here:
// connection.go
func Dial(url string) (*Connection, error) {
return DialConfig(url, Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
})
}
这是通过在打开的连接上发送 heartbeat frames 来实现的,这比将虚假消息发送到仅为该原因创建的队列更有效和更易于维护。
从上面可以看出,您可以使用 amqp.DialConfig
更改连接心跳:
conn, err := amqp.DialConfig(url, amqp.Config{
Heartbeat: 5 * time.Second,
})
您可能想要自己实现的是错误重连逻辑。为此,您可以在此处找到一些有用的信息:How to check if the channel is still working in streadway/amqp RabbitMQ client?
由于每次打开连接进行发布的成本很高,我正在尝试实现某种方式来保持连接活动并在我的应用程序中共享它以发布消息。
var (
Connection *amqp.Connection
Channel *amqp.Channel
err error
)
func Connect() {
Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
FailOnError(err, "Failed to connect to RabbitMQ")
Channel, err = Connection.Channel()
FailOnError(err, "Failed to open a channel")
}
func CloseConnection() {
err = Channel.Close()
FailOnError(err, "Failed to close channel ")
err = Connection.Close()
FailOnError(err, "Failed to close connection ")
}
func KeepAlive() {
queue, err := Channel.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "couldn't publish tics")
tic := "tic"
for {
err := Channel.Publish(
"", // exchange
queue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(tic),
Expiration: "5000",
})
FailOnError(err, "couldn't publish tics")
time.Sleep(5 *time.Second)
}
}
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
函数 KeepAlive
是一个无限循环,每 5 秒发送一次虚拟消息,并且该消息的 TTL 也为 5 秒,因此它会被销毁。
func main() {
rabbitmq.Connect()
defer rabbitmq.CloseConnection()
go func() {
//publisher connection to stay alive as long as application is running
rabbitmq.KeepAlive()
}()
data_layer.OpenDBConnection()
router := gin.Default()
router.POST("/whatever", whatever)
err := router.Run()
if err != nil {
log.Fatal(err.Error())
}
}
在这里,我正在创建连接并在 goroutine 中调用 KeepAlive
,这样它就可以在后台工作,让我的连接始终保持活动状态。
我的问题:
我觉得这种方法只是一种变通方法,虽然我已经尝试搜索如何使其保持活力的示例,但似乎所有这些示例都对消费者方面感兴趣。有没有更简洁的方法来保持我的连接?
只要我的应用程序 运行 不好,我的连接就一直保持活动状态吗?性能明智(网络,内存使用)?注意:我打算用 Prometheus 监控它以观察性能,但任何关于我可能面临的问题的注释都会有所帮助
旁注:发送的这些抽动将被发送到一个虚拟队列,因为如果我将它发送到我的队列,我使用另一个服务的消息,它将卡在没有 TTL 和这些抽搐会变得非常大。
使用 streadway/amqp
你不需要自己实现 keepalive。图书馆已经提供了这种机制。
方法amqp.Dial
constructs a Connection
with a default heartbeat of 10 seconds. You can see the code here:
// connection.go
func Dial(url string) (*Connection, error) {
return DialConfig(url, Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
})
}
这是通过在打开的连接上发送 heartbeat frames 来实现的,这比将虚假消息发送到仅为该原因创建的队列更有效和更易于维护。
从上面可以看出,您可以使用 amqp.DialConfig
更改连接心跳:
conn, err := amqp.DialConfig(url, amqp.Config{
Heartbeat: 5 * time.Second,
})
您可能想要自己实现的是错误重连逻辑。为此,您可以在此处找到一些有用的信息:How to check if the channel is still working in streadway/amqp RabbitMQ client?