使用 apache qpid 通过 amqpwss 连接到 azure 服务总线端口 443

Connecting to azure service bus port 443 with amqpwss using apache qpid

我正在尝试使用后台 amqp-10-jms-spring-boot starter (which I understand to use apache qpid jms 连接到 Azure 服务总线。

当我设置连接字符串 (amqphub.amqp10jms.remote-url) 以使用 'amqps://' 一切正常时,一条 hello world 消息被发送到 Azure 队列,然后被检索& 由应用程序打印。

然而,当我使用“amqpwss://[Endpoint]:443”连接字符串时,我收到一个异常...

Caused by: io.netty.handler.codec.http.websocketx.WebSocketHandshakeException: Invalid handshake response getStatus: 400 This service does not support WebSocket connections.

我怀疑 Azure 服务总线不支持 WebSocket 连接。

  1. amqphub 文档声称这是可能的:https://github.com/amqphub/amqp-10-jms-spring-boot#jms-connection-configuration
  2. Apache quid 文档也是如此:http://qpid.apache.org/releases/qpid-jms-0.52.0/docs/index.html
  3. 服务总线文档“AMQP WebSockets 绑定在 TCP 端口 443 上创建隧道,然后相当于 AMQP 5671 连接。”:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide#connections-and-sessions

如何从 Spring 引导应用程序建立到 Azure 服务总线的 amqpwss 连接?(我会 喜欢 使用 qpid 但我不依赖它)。

示例代码在这里:https://github.com/kevvvvyp/amqp-websocket-apache-qpid

在尝试追加销售并再次查看文档后,我再次证明了 Microsoft 非常有兴趣接纳和供应商将人们锁定在他们的花园中。

撇开长篇大论,URL 连接到 websockets 端点与常规 AMQP 端点不同。假设你有以下

amqps://[endpoint]

要使用 websockets 连接,您需要将协议替换为 amqpwss,并将端口替换为 443

amqpwss://[endpoint]:443

但是,没有提到的是,您还必须指定 /$servicebus/websocket 的路径。结果,最后的url是

amqpwss://[endpoint]:443/$servicebus/websocket

我不知道你应该如何解决这个问题。请证明我是错的,并指出截至 2021 年 3 月 18 日文档中确实包含这一信息的部分。无论如何,我是通过 this SO question connecting from browser via websockets to azure service bus 找到的。

虽然我相信 Dragas 的回答是正确的解决方案,但我认为如果它对任何人有用,我会添加我采用的方法。

我最终使用了 Java azure-service-bus library as I spotted it has a dependency on qpid-proton-j-extensions ,他们自己的 Apache Qpid 扩展库。回购描述为“扩展 qpid-proton-j 库以通过 WEBSOCKETS 讨论 AMQP”......所以我认为这可以完成工作!

我使用 non-descructive read...

创建了一个订阅客户端
 /**
 * Connect & start listening to the azure service bus topic.
 */
public void start() {
    listeningTask = taskExecutor.submit(() -> {
        try {
            SubscriptionClient subscriptionClient = new SubscriptionClient(connectionString, ReceiveMode.PEEKLOCK);
            ExecutorService receiveExecutor = Executors.newCachedThreadPool();
            registerMessageHandlerOnClient(subscriptionClient, receiveExecutor);
        } catch (Exception e) {
            log.error("Caught exception", e);
        }
    });
}

订阅了响应,通过网关将任何消息发送到我的 Spring 集成流中。我添加了一些可配置的属性以在消息消费之间应用回退...

   /**
     * Azure service bus listener.
     *
     * @param receiveClient   client
     * @param executorService executorService
     * @throws Exception If we cannot poll queue.
     */
    private void registerMessageHandlerOnClient(SubscriptionClient receiveClient, ExecutorService executorService) throws Exception {
        // register the RegisterMessageHandler callback
        receiveClient.registerMessageHandler(
                new IMessageHandler() {
                    // callback invoked when the message handler loop has obtained a message
                    public CompletableFuture<Void> onMessageAsync(IMessage message) {

                        log.debug("Message received from azure, id: {}", message.getMessageId()); //TODO deprecation alternative
                        brokerGateway.send(message.getBody());

                        try {
                            Thread.sleep(backOff.toMillis());
                        } catch (InterruptedException e) {
                            log.error("Failed to apply azure backoff", e);
                        }
                        return CompletableFuture.completedFuture(null);
                    }

                    // callback invoked when the message handler has an exception to report
                    public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                        log.error("Exception {}", exceptionPhase, throwable);
                    }
                },
                // 1 concurrent call, messages are auto-completed, auto-renew duration
                new MessageHandlerOptions(1, false, Duration.ofMinutes(1)),
                executorService);

    }