如何使我的连接保持活动状态以使用 RabbitMQ streadway/amqp 发布消息?

How to keep my connection alive for publishing messages with RabbitMQ streadway/amqp?

由于每次打开连接进行发布的成本很高,我正在尝试实现某种方式来保持连接活动并在我的应用程序中共享它以发布消息。

var (
    Connection *amqp.Connection
    Channel *amqp.Channel
    err error
)

func Connect() {

    Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
    FailOnError(err, "Failed to connect to RabbitMQ")

    Channel, err = Connection.Channel()
    FailOnError(err, "Failed to open a channel")
}

func CloseConnection() {
    err = Channel.Close()
    FailOnError(err, "Failed to close channel ")
    err = Connection.Close()
    FailOnError(err, "Failed to close connection ")
}

func KeepAlive() {

    queue, err := Channel.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    FailOnError(err, "couldn't publish tics")

    tic := "tic"
    for {
        err := Channel.Publish(
            "",         // exchange
            queue.Name, // routing key
            false,      // mandatory
            false,      // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(tic),
                Expiration: "5000",
            })
        FailOnError(err, "couldn't publish tics")
        time.Sleep(5 *time.Second)
    }
}

func FailOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

函数 KeepAlive 是一个无限循环,每 5 秒发送一次虚拟消息,并且该消息的 TTL 也为 5 秒,因此它会被销毁。

func main() {
    rabbitmq.Connect()
    defer rabbitmq.CloseConnection()
    go func() {
        //publisher connection to stay alive as long as application is running
        rabbitmq.KeepAlive()
    }()

    data_layer.OpenDBConnection()
    router := gin.Default()

    router.POST("/whatever", whatever)

    err := router.Run()
    if err != nil {
        log.Fatal(err.Error())
    }
}

在这里,我正在创建连接并在 goroutine 中调用 KeepAlive,这样它就可以在后台工作,让我的连接始终保持活动状态。

我的问题:

旁注:发送的这些抽动将被发送到一个虚拟队列,因为如果我将它发送到我的队列,我使用另一个服务的消息,它将卡在没有 TTL 和这些抽搐会变得非常大。

使用 streadway/amqp 你不需要自己实现 keepalive。图书馆已经提供了这种机制。

方法amqp.Dial constructs a Connection with a default heartbeat of 10 seconds. You can see the code here:

// connection.go
func Dial(url string) (*Connection, error) {
    return DialConfig(url, Config{
        Heartbeat: defaultHeartbeat,
        Locale:    defaultLocale,
    })
}

这是通过在打开的连接上发送 heartbeat frames 来实现的,这比将虚假消息发送到仅为该原因创建的队列更有效和更易于维护。

从上面可以看出,您可以使用 amqp.DialConfig 更改连接心跳:

    conn, err := amqp.DialConfig(url, amqp.Config{
        Heartbeat: 5 * time.Second,
    })

您可能想要自己实现的是错误重连逻辑。为此,您可以在此处找到一些有用的信息:How to check if the channel is still working in streadway/amqp RabbitMQ client?