如何使我的连接保持活动状态以使用 RabbitMQ streadway/amqp 发布消息?

How to keep my connection alive for publishing messages with RabbitMQ streadway/amqp?


var (
    Connection *amqp.Connection
    Channel *amqp.Channel
    err error

func Connect() {

    Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
    FailOnError(err, "Failed to connect to RabbitMQ")

    Channel, err = Connection.Channel()
    FailOnError(err, "Failed to open a channel")

func CloseConnection() {
    err = Channel.Close()
    FailOnError(err, "Failed to close channel ")
    err = Connection.Close()
    FailOnError(err, "Failed to close connection ")

func KeepAlive() {

    queue, err := Channel.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    FailOnError(err, "couldn't publish tics")

    tic := "tic"
    for {
        err := Channel.Publish(
            "",         // exchange
            queue.Name, // routing key
            false,      // mandatory
            false,      // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(tic),
                Expiration: "5000",
        FailOnError(err, "couldn't publish tics")
        time.Sleep(5 *time.Second)

func FailOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)

函数 KeepAlive 是一个无限循环,每 5 秒发送一次虚拟消息,并且该消息的 TTL 也为 5 秒,因此它会被销毁。

func main() {
    defer rabbitmq.CloseConnection()
    go func() {
        //publisher connection to stay alive as long as application is running

    router := gin.Default()

    router.POST("/whatever", whatever)

    err := router.Run()
    if err != nil {

在这里,我正在创建连接并在 goroutine 中调用 KeepAlive,这样它就可以在后台工作,让我的连接始终保持活动状态。


旁注:发送的这些抽动将被发送到一个虚拟队列,因为如果我将它发送到我的队列,我使用另一个服务的消息,它将卡在没有 TTL 和这些抽搐会变得非常大。

使用 streadway/amqp 你不需要自己实现 keepalive。图书馆已经提供了这种机制。

方法amqp.Dial constructs a Connection with a default heartbeat of 10 seconds. You can see the code here:

// connection.go
func Dial(url string) (*Connection, error) {
    return DialConfig(url, Config{
        Heartbeat: defaultHeartbeat,
        Locale:    defaultLocale,

这是通过在打开的连接上发送 heartbeat frames 来实现的,这比将虚假消息发送到仅为该原因创建的队列更有效和更易于维护。

从上面可以看出,您可以使用 amqp.DialConfig 更改连接心跳:

    conn, err := amqp.DialConfig(url, amqp.Config{
        Heartbeat: 5 * time.Second,

您可能想要自己实现的是错误重连逻辑。为此,您可以在此处找到一些有用的信息:How to check if the channel is still working in streadway/amqp RabbitMQ client?