使用 apache qpid 通过 amqpwss 连接到 azure 服务总线端口 443
Connecting to azure service bus port 443 with amqpwss using apache qpid
我正在尝试使用后台 amqp-10-jms-spring-boot starter (which I understand to use apache qpid jms 连接到 Azure 服务总线。
当我设置连接字符串 (amqphub.amqp10jms.remote-url) 以使用 'amqps://' 一切正常时,一条 hello world 消息被发送到 Azure 队列,然后被检索& 由应用程序打印。
然而,当我使用“amqpwss://[Endpoint]:443”连接字符串时,我收到一个异常...
Caused by: io.netty.handler.codec.http.websocketx.WebSocketHandshakeException: Invalid handshake response getStatus: 400 This service does not support WebSocket connections.
我怀疑 Azure 服务总线不支持 WebSocket 连接。
- amqphub 文档声称这是可能的:https://github.com/amqphub/amqp-10-jms-spring-boot#jms-connection-configuration
- Apache quid 文档也是如此:http://qpid.apache.org/releases/qpid-jms-0.52.0/docs/index.html
- 服务总线文档“AMQP WebSockets 绑定在 TCP 端口 443 上创建隧道,然后相当于 AMQP 5671 连接。”:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide#connections-and-sessions
如何从 Spring 引导应用程序建立到 Azure 服务总线的 amqpwss 连接?(我会 喜欢 使用 qpid 但我不依赖它)。
示例代码在这里:https://github.com/kevvvvyp/amqp-websocket-apache-qpid
在尝试追加销售并再次查看文档后,我再次证明了 Microsoft 非常有兴趣接纳和供应商将人们锁定在他们的花园中。
撇开长篇大论,URL 连接到 websockets 端点与常规 AMQP 端点不同。假设你有以下
amqps://[endpoint]
要使用 websockets 连接,您需要将协议替换为 amqpwss,并将端口替换为 443
amqpwss://[endpoint]:443
但是,没有提到的是,您还必须指定 /$servicebus/websocket
的路径。结果,最后的url是
amqpwss://[endpoint]:443/$servicebus/websocket
我不知道你应该如何解决这个问题。请证明我是错的,并指出截至 2021 年 3 月 18 日文档中确实包含这一信息的部分。无论如何,我是通过 this SO question connecting from browser via websockets to azure service bus 找到的。
虽然我相信 Dragas 的回答是正确的解决方案,但我认为如果它对任何人有用,我会添加我采用的方法。
我最终使用了 Java azure-service-bus library as I spotted it has a dependency on qpid-proton-j-extensions ,他们自己的 Apache Qpid 扩展库。回购描述为“扩展 qpid-proton-j 库以通过 WEBSOCKETS 讨论 AMQP”......所以我认为这可以完成工作!
我使用 non-descructive read...
创建了一个订阅客户端
/**
* Connect & start listening to the azure service bus topic.
*/
public void start() {
listeningTask = taskExecutor.submit(() -> {
try {
SubscriptionClient subscriptionClient = new SubscriptionClient(connectionString, ReceiveMode.PEEKLOCK);
ExecutorService receiveExecutor = Executors.newCachedThreadPool();
registerMessageHandlerOnClient(subscriptionClient, receiveExecutor);
} catch (Exception e) {
log.error("Caught exception", e);
}
});
}
订阅了响应,通过网关将任何消息发送到我的 Spring 集成流中。我添加了一些可配置的属性以在消息消费之间应用回退...
/**
* Azure service bus listener.
*
* @param receiveClient client
* @param executorService executorService
* @throws Exception If we cannot poll queue.
*/
private void registerMessageHandlerOnClient(SubscriptionClient receiveClient, ExecutorService executorService) throws Exception {
// register the RegisterMessageHandler callback
receiveClient.registerMessageHandler(
new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
log.debug("Message received from azure, id: {}", message.getMessageId()); //TODO deprecation alternative
brokerGateway.send(message.getBody());
try {
Thread.sleep(backOff.toMillis());
} catch (InterruptedException e) {
log.error("Failed to apply azure backoff", e);
}
return CompletableFuture.completedFuture(null);
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
log.error("Exception {}", exceptionPhase, throwable);
}
},
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, false, Duration.ofMinutes(1)),
executorService);
}
我正在尝试使用后台 amqp-10-jms-spring-boot starter (which I understand to use apache qpid jms 连接到 Azure 服务总线。
当我设置连接字符串 (amqphub.amqp10jms.remote-url) 以使用 'amqps://' 一切正常时,一条 hello world 消息被发送到 Azure 队列,然后被检索& 由应用程序打印。
然而,当我使用“amqpwss://[Endpoint]:443”连接字符串时,我收到一个异常...
Caused by: io.netty.handler.codec.http.websocketx.WebSocketHandshakeException: Invalid handshake response getStatus: 400 This service does not support WebSocket connections.
我怀疑 Azure 服务总线不支持 WebSocket 连接。
- amqphub 文档声称这是可能的:https://github.com/amqphub/amqp-10-jms-spring-boot#jms-connection-configuration
- Apache quid 文档也是如此:http://qpid.apache.org/releases/qpid-jms-0.52.0/docs/index.html
- 服务总线文档“AMQP WebSockets 绑定在 TCP 端口 443 上创建隧道,然后相当于 AMQP 5671 连接。”:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide#connections-and-sessions
如何从 Spring 引导应用程序建立到 Azure 服务总线的 amqpwss 连接?(我会 喜欢 使用 qpid 但我不依赖它)。
示例代码在这里:https://github.com/kevvvvyp/amqp-websocket-apache-qpid
在尝试追加销售并再次查看文档后,我再次证明了 Microsoft 非常有兴趣接纳和供应商将人们锁定在他们的花园中。
撇开长篇大论,URL 连接到 websockets 端点与常规 AMQP 端点不同。假设你有以下
amqps://[endpoint]
要使用 websockets 连接,您需要将协议替换为 amqpwss,并将端口替换为 443
amqpwss://[endpoint]:443
但是,没有提到的是,您还必须指定 /$servicebus/websocket
的路径。结果,最后的url是
amqpwss://[endpoint]:443/$servicebus/websocket
我不知道你应该如何解决这个问题。请证明我是错的,并指出截至 2021 年 3 月 18 日文档中确实包含这一信息的部分。无论如何,我是通过 this SO question connecting from browser via websockets to azure service bus 找到的。
虽然我相信 Dragas 的回答是正确的解决方案,但我认为如果它对任何人有用,我会添加我采用的方法。
我最终使用了 Java azure-service-bus library as I spotted it has a dependency on qpid-proton-j-extensions ,他们自己的 Apache Qpid 扩展库。回购描述为“扩展 qpid-proton-j 库以通过 WEBSOCKETS 讨论 AMQP”......所以我认为这可以完成工作!
我使用 non-descructive read...
创建了一个订阅客户端 /**
* Connect & start listening to the azure service bus topic.
*/
public void start() {
listeningTask = taskExecutor.submit(() -> {
try {
SubscriptionClient subscriptionClient = new SubscriptionClient(connectionString, ReceiveMode.PEEKLOCK);
ExecutorService receiveExecutor = Executors.newCachedThreadPool();
registerMessageHandlerOnClient(subscriptionClient, receiveExecutor);
} catch (Exception e) {
log.error("Caught exception", e);
}
});
}
订阅了响应,通过网关将任何消息发送到我的 Spring 集成流中。我添加了一些可配置的属性以在消息消费之间应用回退...
/**
* Azure service bus listener.
*
* @param receiveClient client
* @param executorService executorService
* @throws Exception If we cannot poll queue.
*/
private void registerMessageHandlerOnClient(SubscriptionClient receiveClient, ExecutorService executorService) throws Exception {
// register the RegisterMessageHandler callback
receiveClient.registerMessageHandler(
new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
log.debug("Message received from azure, id: {}", message.getMessageId()); //TODO deprecation alternative
brokerGateway.send(message.getBody());
try {
Thread.sleep(backOff.toMillis());
} catch (InterruptedException e) {
log.error("Failed to apply azure backoff", e);
}
return CompletableFuture.completedFuture(null);
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
log.error("Exception {}", exceptionPhase, throwable);
}
},
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, false, Duration.ofMinutes(1)),
executorService);
}