使用 QPid 和 golang 包装器 Electron 连接到 AMQP 1.0 Azure EventHub

Connecting to the AMQP 1.0 Azure EventHub with QPid and the golang wrapper Electron

我想使用 Qpid 质子-c 库的 Electron golang 包装器连接到 Azure EventHub。

我将以下 SASL 详细信息设置为构建连接字符串所需的 host/port/namespace/path,但由于某种原因,我不断收到错误消息:connection reset by peer.

package main

import (
    "fmt"
    "os"
    "strings"
    "qpid.apache.org/amqp"
    "qpid.apache.org/electron"
)

var (
    eventHubNamespaceName = "<MY_CUSTOM_NAMESPACE>"
    eventHubName = "<MY_CUSTOM_NAME>"
    eventHubSasKeyName = "<MY_CUSTOM_SAS_KEY_NAME>"
    eventHubSasKey = "<MY_CUSTOM_SAS_KEY>" // this is the base64 encoded stuff
)

func main() {

    sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
    container := electron.NewContainer(fmt.Sprintf("send[%v]", os.Getpid()))

    urlStr := fmt.Sprintf("amqp://%s.servicebus.windows.net:5671/%s", eventHubNamespaceName, eventHubName)
    fmt.Printf("The URL connection string: '%v'\n", urlStr)

    // parse URL
    url, err := amqp.ParseURL(urlStr)
    if err != nil {
        panic(err)
    }
    fmt.Printf("The AMQP parsed URL: %v\n", url)

    // TCP dial
    amqpHost := url.Host
    fmt.Printf("The AMQP host used in the connection is: '%v'\n", amqpHost)
    c, err := container.Dial(
        "tcp", amqpHost, 
        electron.SASLEnable(), 
        electron.Password([]byte(eventHubSasKey)), 
        electron.User(eventHubSasKeyName),
    )
    if err != nil {
        panic(err)
    }
    defer c.Close(nil)

    // AMQP send
    addr := strings.TrimPrefix(url.Path, "/")
    s, err := c.Sender(electron.Target(addr))
    if err != nil {
        panic(err)
    }
    m := amqp.NewMessage()
    body := fmt.Sprintf("bla bla bla %v", 42)
    m.Marshal(body)
    fmt.Printf("The AMQP message body: '%v'\n", m.Body())

    go s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan

    // AMQP ACK receive
    fmt.Printf("Waiting for ACKs...\n")
    for {
        fmt.Printf("Waiting for an ACK coming out of the channel...\n")
        out := <-sentChan // Outcome of async sends.
        fmt.Printf("Received something: '%v'\n", out)
    }   
}

编译时,然后运行代码,这是输出:

The URL connection string: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP parsed URL: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP host used in the connection is: '<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671'
The AMQP message body: 'bla bla bla 42'
Waiting for ACKs...
Waiting for an ACK coming out of the channel...
Received something: '{unsent : read tcp <MY_PRIVATE_IP_IN_LAN>:<SOME_PORT>-><THE_NSLOOKUP_IP_OF_THE_AZURE_EVENTHUB>:5671: read: connection reset by peer bla bla bla 42}'
Waiting for an ACK coming out of the channel...

对我来说,收到的消息说 connection reset by peer 看起来不像是有效的 ACK,我不确定连接尝试有什么问题?

我不确定如何继续这里,因为我卡在连接部分,我相信 SASL 详细信息根据此处的文档以正确的方式传递:https://godoc.org/qpid.apache.org/electron#ConnectionOption

我仍然不确定失败的原因不是因为 SSL 证书,如果是这样的话,我正在努力了解如何将它们包含在流程中。

编辑:

我后来发现我必须通过 TCP 建立 TLS 连接,即使我没有提供任何 private/public 对密钥,也指定了 "virtual host"(否则 AMQP 抱怨无法识别主持人):

    // TLS connection details
    tlsConfig := &tls.Config{}
    eventHubDomainPort := fmt.Sprintf("%s.servicebus.windows.net:5671", eventHubNamespaceName)
    tlsConn, err := tls.Dial("tcp", eventHubDomainPort, tlsConfig)
    if err != nil {
        panic(err)
    }

    // AMPQ container connection on top of TLS via TCP
    eventHubDomain := fmt.Sprintf("%s.servicebus.windows.net", eventHubNamespaceName)
    amqpConn, err := container.Connection(
        tlsConn, 
        electron.SASLEnable(),
        electron.User(eventHubSasKeyName), 
        electron.Password([]byte(eventHubSasKey)),
        electron.VirtualHost(eventHubDomain),
        // electron.SASLAllowedMechs(<SOME_MECHANISM>),
    )
    if err != nil {
        panic(err)
    }
    defer amqpConn.Close(nil)

    // AMQP sender (a AMQP link with target the name defined on the Azure portal)
    s, err := amqpConn.Sender(electron.Target(eventHubName))
    if err != nil {
        panic(err)
    }

然而,当 运行 具有环境变量 PN_TRACE_FRM=true 的应用程序(这在 proton-c 级别给我一些详细的日志记录)现在错误是:

[handle=0, closed=true, error=@error(29) [condition=:"amqp:unauthorized-access", description="Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'sb://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>'. TrackingId:<SOME_UUID-ISH_HERE>, SystemTracker:<A_LABEL_HERE>, Timestamp:10/25/2017 4:02:58 PM"]]

这个 afaik 意味着 SASL 详细信息 (username/password) 必须是 "sender" 类型,因为我正在尝试将某些内容发送到事件中心。我在 Azure 门户上仔细检查了这些详细信息(单击 "Shared access policies" > 然后使用将 "claim" 指定为 "Send" 的策略),它们是正确的。所以我不确定为什么会收到此错误。

我实际上尝试了在不同级别的 Azure 门户上定义的这些 SASL 策略,<MY_CUSTOM_NAMESPACE><MY_CUSTOM_NAME>,但总是出现相同的错误消息。

我还尝试包括各种 SASL 机制,例如使用 electron.SASLAllowedMechs("PLAIN") 时出现此错误:no mechanism available: No worthy mechs found (Authentication failed [mech=none]).

在端口 5671 的 urlStr 中使用 "amqps" 方案。事件中心不允许纯 tcp 连接。您还需要启用 SASL PLAIN 以发送在命名空间或事件中心实体上配置的 SAS 密钥(用户名=密钥名称,密码=密钥)(看起来您已经在这样做了)。我不确定 golang,但使用 Python 绑定可以将所有内容放入像这样 "amqps://sas-key-name:url-encoded-key@your-namespace.servicebus.windows.net:5671" 的 Uri 中。端口号是可选的。

如果底层 proton-c 引擎发现不同的受支持 SASL 机制,则它可能不会使用 SASL PLAIN。要强制执行 PLAIN,您可以在容器上设置允许的机制。在 go 中,SASLAllowedMechs 函数似乎为您提供了一个连接选项,您可以在创建连接时提供该选项。

这是与事件中心配合得很好的Python code

我设法在 AMQP 之上使用 "Claims-based authorization" (CBS) 建立了连接。这似乎是微软特有的东西。可以在本页底部找到一些详细信息:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide

基本上这是步骤列表:

  • TLS 连接 electron.VirtualHost(eventHubDomain)ANONYMOUS SASL 机制 electron.SASLAllowedMechs("ANONYMOUS")(无需指定 SASL 用户名和密码)。在上面我的问题的 Edit 部分查看详细信息 ^.
  • AMQP link 用于特殊的 $cbs 事件中心名称:cbsLink, err := amqpConnection.Sender(electron.Target("$cbs"))
  • 根据 Microsoft 对 CBS 握手的要求准备 AMQP 消息:

消息属性(检查此 C# 代码以进行比较 https://github.com/Azure/amqpnetlite/blob/master/Examples/ServiceBus/Scenarios/CbsAsyncExample.cs):

appProps := make(map[string]interface{})
appProps["operation"] = "put-token"
appProps["type"] = "servicebus.windows.net:sastoken"
appProps["name"] = "amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>"

按 Microsoft 要求的方式格式化的 SAS 令牌,我已经修改了这段代码:https://github.com/michaelbironneau/asbclient/blob/master/azure.go 这样:

aqClient := newClient(Queue, "<MY_CUSTOM_NAMESPACE>", "<MY_CUSTOM_SAS_KEY_NAME>", "<MY_CUSTOM_SAS_KEY>")
sasToken := aqClient.authHeader("amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>", aqClient.signatureExpiry(time.Now()))

那段代码 ^ 基于此处的 python SDK:https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py 包含很多东西,例如 upper/lower 大小写 URL 编码,与过期时间戳混合用途以及 SASL 用户名和密码。

构建 AMQP 消息导入 "qpid.apache.org/amqp":

cbsHandshakeMsg := amqp.NewMessage()
cbsHandshakeMsg.SetApplicationProperties(appProps)
cbsHandshakeMsg.Marshal(sasToken)
  • 使用 outcome := cbsLink.SendSync(cbsHandshakeMsg) 发送此 AMQP 消息,然后奇迹般地,您应该在一段时间内通过事件中心的身份验证。
  • 首先将 AMQP link 设置为您要连接的事件中心名称:msgSender, err := amqpConnection.Sender(electron.Target("<MY_CUSTOM_NAME>"))

现在您可以使用最后一个 AMQP link 以这种方式发送您想要发送的消息:

m := amqp.NewMessage()
m.Marshal("my message: bla bla bla, foo bar baz!")
outcome := msgSender.SendSync(m)

完成:)

运行 这段带有环境变量 PN_TRACE_FRM=true 的代码对排除 AMQP 故障有很大帮助,因为 proton-c 库记录了很多有用的调试消息。

出于某种原因,AMQP PLAIN 机制在连接尝试期间直接传递 SASL 用户名和密码不适用于事件中心。这可能是他们或 Electron/Qpid 库的问题,我不确定,但现在至少有人能够使用 golang 和 CBS Microsoft 协议发送消息他们可用。

azure AMQP protocol guide 所述,需要 TLS。

After setting up the connection and TLS, Service Bus offers two SASL mechanism options:

  1. SASL PLAIN is commonly used for passing username and password credentials to a server. Service Bus does not have accounts, but named Shared Access Security rules, which confer rights and are associated with a key. The name of a rule is used as the user name and the key (as base64 encoded text) is used as the password. The rights associated with the chosen rule govern the operations allowed on the connection.
  2. SASL ANONYMOUS is used for bypassing SASL authorization when the client wants to use the claims-based-security (CBS) model that is described later. With this option, a client connection can be established anonymously for a short time during which the client can only interact with the CBS endpoint and the CBS handshake must complete.

我们可以选择 SASL PLAIN 或 CBS 进行身份验证,以 PLAIN 为例,我稍微修改了你的代码,它按预期工作。神奇的部分在连接选项下面:

amqpConn, err := container.Connection(
    tlsConn,
    electron.SASLEnable(),
    electron.Password([]byte(eventHubSasKey)),
    electron.User(eventHubSasKeyName),
    electron.VirtualHost(eventHubDomain),
    electron.SASLAllowInsecure(true),
    electron.SASLAllowedMechs("PLAIN"),
)

SASLAllowInsecure returns 允许或禁止明文 SASL 身份验证机制的 ConnectionOption,如果我们选择使用 SASL PLAIN,则应将其设置为 true。

希望对您有所帮助。