你如何连接到 AMQP 1.0 主题而不在 Golang 中排队
How do you connect to an AMQP 1.0 topic not queue in Golang
我一直在尝试 go-amp package README 上的示例代码,但我想连接到一个主题而不是队列,如今天的自述文件中的示例代码所示。
我所做的只是将主题名称放在 'queue-name' 这样放置的位置。
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/Azure/go-amqp"
)
const host = "example.com"
const topic = "/topic/my_topic"
const port = "5672"
const username = "my_username"
const password = "my_password"
// A hleper function to handle errors
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s %s", msg, err)
}
}
func main(){
// connect to remote amqp server
host_address := fmt.Sprintf("amqps://%s:%s", host, port)
log.Println("Connecting to ", host_address)
client, err := amqp.Dial(host_address,
amqp.ConnSASLPlain(username, password),
)
failOnError(err, "Failed to connect to Server")
defer client.Close()
// Open a session
session, err := client.NewSession()
failOnError(err, "Failed to create AMQP session")
ctx := context.Background()
// Continuously read messages
{
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress(topic),
amqp.LinkCredit(10),
)
failOnError(err, "Failed creating receiver link")
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
receiver.Close(ctx)
cancel()
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
for {
// Receive next message
msg, err := receiver.Receive(ctx)
failOnError(err, "Failed reading message from AMQP:")
// Accept message
msg.Accept(context.Background())
fmt.Printf("Message received: Body: %s\n", msg.Value)
}
}
}
我一直收到这个错误。
Failed creating receiver link *Error{Condition: amqp:unauthorized-access,
Description: User my_username is not authorized to read from: queue:///topic/my_topic, Info: map[]}
看起来它正在将我的主题视为一个队列。如何设置接收器尝试附加到主题而不是队列?
编辑
我正在使用一个使用 AMQP 1.0 的 ActiveMQ 代理。它由其他人管理,所以我只需要使用 AMQP 1.0。
这意味着我不能使用更流行的 go amqp package as it has no support for AMQP 1.0. Thanks Tim Bish 来提醒我添加这个。
经过反复试验,我认为可以将主题更改为 'topic://my_topic'
const topic = "topic://my_topic"
其余代码保持不变。我创建了一个 gist 用于发送和接收主题。
我希望这能帮助像我这样的新手在键盘上敲打他们的脑袋。
我一直在尝试 go-amp package README 上的示例代码,但我想连接到一个主题而不是队列,如今天的自述文件中的示例代码所示。
我所做的只是将主题名称放在 'queue-name' 这样放置的位置。
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/Azure/go-amqp"
)
const host = "example.com"
const topic = "/topic/my_topic"
const port = "5672"
const username = "my_username"
const password = "my_password"
// A hleper function to handle errors
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s %s", msg, err)
}
}
func main(){
// connect to remote amqp server
host_address := fmt.Sprintf("amqps://%s:%s", host, port)
log.Println("Connecting to ", host_address)
client, err := amqp.Dial(host_address,
amqp.ConnSASLPlain(username, password),
)
failOnError(err, "Failed to connect to Server")
defer client.Close()
// Open a session
session, err := client.NewSession()
failOnError(err, "Failed to create AMQP session")
ctx := context.Background()
// Continuously read messages
{
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress(topic),
amqp.LinkCredit(10),
)
failOnError(err, "Failed creating receiver link")
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
receiver.Close(ctx)
cancel()
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
for {
// Receive next message
msg, err := receiver.Receive(ctx)
failOnError(err, "Failed reading message from AMQP:")
// Accept message
msg.Accept(context.Background())
fmt.Printf("Message received: Body: %s\n", msg.Value)
}
}
}
我一直收到这个错误。
Failed creating receiver link *Error{Condition: amqp:unauthorized-access,
Description: User my_username is not authorized to read from: queue:///topic/my_topic, Info: map[]}
看起来它正在将我的主题视为一个队列。如何设置接收器尝试附加到主题而不是队列?
编辑
我正在使用一个使用 AMQP 1.0 的 ActiveMQ 代理。它由其他人管理,所以我只需要使用 AMQP 1.0。
这意味着我不能使用更流行的 go amqp package as it has no support for AMQP 1.0. Thanks Tim Bish 来提醒我添加这个。
经过反复试验,我认为可以将主题更改为 'topic://my_topic'
const topic = "topic://my_topic"
其余代码保持不变。我创建了一个 gist 用于发送和接收主题。
我希望这能帮助像我这样的新手在键盘上敲打他们的脑袋。