你如何连接到 AMQP 1.0 主题而不在 Golang 中排队

How do you connect to an AMQP 1.0 topic not queue in Golang

我一直在尝试 go-amp package README 上的示例代码,但我想连接到一个主题而不是队列,如今天的自述文件中的示例代码所示。

我所做的只是将主题名称放在 'queue-name' 这样放置的位置。

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/Azure/go-amqp"
)

const host = "example.com"
const topic = "/topic/my_topic"
const port = "5672"
const username = "my_username"
const password = "my_password"

// A hleper function to handle errors
func failOnError(err error, msg string){
    if err != nil {
        log.Fatalf("%s %s", msg, err)
    }
}


func main(){
    // connect to remote amqp server
    host_address := fmt.Sprintf("amqps://%s:%s", host, port)
    log.Println("Connecting to ", host_address)

    client, err := amqp.Dial(host_address,
        amqp.ConnSASLPlain(username, password),
    )
    failOnError(err, "Failed to connect to Server")
    defer client.Close()

    // Open a session
    session, err := client.NewSession()
    failOnError(err, "Failed to create AMQP session")

    ctx := context.Background()

        // Continuously read messages
        {
            // Create a receiver
            receiver, err := session.NewReceiver(
                amqp.LinkSourceAddress(topic),
                amqp.LinkCredit(10),
            )
            failOnError(err, "Failed creating receiver link")
            defer func() {
                ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
                receiver.Close(ctx)
                cancel()
            }()

            log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
            for {
                // Receive next message
                msg, err := receiver.Receive(ctx)
                failOnError(err, "Failed reading message from AMQP:")
    
                // Accept message
                msg.Accept(context.Background())    
                fmt.Printf("Message received: Body: %s\n", msg.Value)
            }
        }
}

我一直收到这个错误。

Failed creating receiver link *Error{Condition: amqp:unauthorized-access, 
Description: User my_username is not authorized to read from: queue:///topic/my_topic, Info: map[]}

看起来它正在将我的主题视为一个队列。如何设置接收器尝试附加到主题而不是队列?

编辑
我正在使用一个使用 AMQP 1.0 的 ActiveMQ 代理。它由其他人管理,所以我只需要使用 AMQP 1.0。 这意味着我不能使用更流行的 go amqp package as it has no support for AMQP 1.0. Thanks Tim Bish 来提醒我添加这个。

经过反复试验,我认为可以将主题更改为 'topic://my_topic'

const topic = "topic://my_topic"

其余代码保持不变。我创建了一个 gist 用于发送和接收主题。

我希望这能帮助像我这样的新手在键盘上敲打他们的脑袋。