在 Spring 引导应用程序中使用多个 Pods 启用 WebSocket 连接
Enable WebSocket Connections with multiple Pods in Spring Boot Application
我在 spring 启动应用程序中使用网络套接字协议。使用了多个 pods 来处理繁忙的流量。现在,有多个 pods 会导致问题。让我简单介绍一下,
假设有 2 个 pods(Pod 1,Pod 2)。 Angular UI 正在通过网络套接字协议订阅 spring 启动应用程序,比方说通过 Pod 1。现在,spring 启动应用程序向 UI,假设它是通过 Pod 2 发送的,并且由于网络套接字连接是通过 Pod 1 建立的,所以这条消息正在被丢弃(永远不会到达 UI)。
因此,一些消息被丢弃,这些消息被其他 Pods 发送到 UI(未用于初始订阅过程),并且消息通过 Pod 发送最初用于订阅,仅在 UI.
收到这些消息
如何解决这种情况,以便在这种多重 pods 环境中将每条消息发送到 UI?
多个 pod 问题的解决方案是 using an external message broker
(如 RabbitMq、ActiveMq),而不是内存中的消息代理(默认行为)。
你在实现这个的时候可能会遇到以下问题(把它们写在一个地方,这样你就不用像我一样费力了),
- 创建自动删除队列
使用外部消息代理时,您可能会观察到为每个 websocket 连接创建队列,但当 websocket 连接结束时它们不会被删除。
我们甚至不需要这些队列。因此需要自动删除队列。当 websocket 连接关闭时,这些自动队列会自动删除。
如何声明自动删除队列,很简单
When using user destinations with an external message broker, check the broker documentation on how to manage inactive queues, so that when the user session is over, all unique user queues are removed. For example, RabbitMQ creates auto-delete queues when destinations like /exchange/amq.direct/position-updates are used. So in that case the client could subscribe to /user/exchange/amq.direct/position-updates. Similarly, ActiveMQ has configuration options for purging inactive destinations.
简单来说,websocket 客户端和 websocket 服务器应该使用 /exchange/amq.direct/<anything>
这个交换目的地。
有关详细信息,请阅读 official docs
ssl/stomp 云实例协议
当您将应用程序托管到 AWS 或 Azure 或 Google 云时,您可能面临的另一个问题是它们使用 ssl/stomp
协议,因此您编写的代码在本地机器上运行良好(因为它使用 stomp
协议)在云中无法正常工作。
从一个 pod 向另一个 pod 广播消息 pods
此问题与此 Whosebug 问题中所写的相同。 [参考问题以获得许可]
现在,lemme 放出代码片段并将添加注释以指示代码片段的哪一部分解决了哪个问题。将其添加到您的 configureMessageBroker
方法中,
val tcpClient = new ReactorNettyTcpClient<>
(TcpClient.create()
.port(yourRabbitmqCloudStompPort)
.host(yourRabbitmqCloudHost)
.secure(SslProvider.defaultClientProvider()),
new StompReactorNettyCodec());
messageBrokerRegistry
// enables stompbroker, instead of in-memory broker
.enableStompBroker("/queue", "/topic", "/exchange")
.setClientLogin(yourRabbitmqCloudClientLogin)
.setClientPasscode(yourRabbitmqCloudClientPasscode)
.setSystemLogin(yourRabbitmqCloudSystemLogin)
.setSystemPasscode(yourRabbitmqCloudSystemPasscode)
// broadcast msg to every pod
.setUserDestinationBroadcast("/topic/unresolved-user-destination")
.setUserRegistryBroadcasr("/topic/user-registry")
// enables ssl/stomp protocol
.setTcpClient(tcpClient);
我在 spring 启动应用程序中使用网络套接字协议。使用了多个 pods 来处理繁忙的流量。现在,有多个 pods 会导致问题。让我简单介绍一下,
假设有 2 个 pods(Pod 1,Pod 2)。 Angular UI 正在通过网络套接字协议订阅 spring 启动应用程序,比方说通过 Pod 1。现在,spring 启动应用程序向 UI,假设它是通过 Pod 2 发送的,并且由于网络套接字连接是通过 Pod 1 建立的,所以这条消息正在被丢弃(永远不会到达 UI)。
因此,一些消息被丢弃,这些消息被其他 Pods 发送到 UI(未用于初始订阅过程),并且消息通过 Pod 发送最初用于订阅,仅在 UI.
收到这些消息如何解决这种情况,以便在这种多重 pods 环境中将每条消息发送到 UI?
多个 pod 问题的解决方案是 using an external message broker
(如 RabbitMq、ActiveMq),而不是内存中的消息代理(默认行为)。
你在实现这个的时候可能会遇到以下问题(把它们写在一个地方,这样你就不用像我一样费力了),
- 创建自动删除队列
使用外部消息代理时,您可能会观察到为每个 websocket 连接创建队列,但当 websocket 连接结束时它们不会被删除。
我们甚至不需要这些队列。因此需要自动删除队列。当 websocket 连接关闭时,这些自动队列会自动删除。
如何声明自动删除队列,很简单
When using user destinations with an external message broker, check the broker documentation on how to manage inactive queues, so that when the user session is over, all unique user queues are removed. For example, RabbitMQ creates auto-delete queues when destinations like /exchange/amq.direct/position-updates are used. So in that case the client could subscribe to /user/exchange/amq.direct/position-updates. Similarly, ActiveMQ has configuration options for purging inactive destinations.
简单来说,websocket 客户端和 websocket 服务器应该使用 /exchange/amq.direct/<anything>
这个交换目的地。
有关详细信息,请阅读 official docs
ssl/stomp 云实例协议
当您将应用程序托管到 AWS 或 Azure 或 Google 云时,您可能面临的另一个问题是它们使用ssl/stomp
协议,因此您编写的代码在本地机器上运行良好(因为它使用stomp
协议)在云中无法正常工作。从一个 pod 向另一个 pod 广播消息 pods
此问题与此 Whosebug 问题中所写的相同。 [参考问题以获得许可]
现在,lemme 放出代码片段并将添加注释以指示代码片段的哪一部分解决了哪个问题。将其添加到您的 configureMessageBroker
方法中,
val tcpClient = new ReactorNettyTcpClient<>
(TcpClient.create()
.port(yourRabbitmqCloudStompPort)
.host(yourRabbitmqCloudHost)
.secure(SslProvider.defaultClientProvider()),
new StompReactorNettyCodec());
messageBrokerRegistry
// enables stompbroker, instead of in-memory broker
.enableStompBroker("/queue", "/topic", "/exchange")
.setClientLogin(yourRabbitmqCloudClientLogin)
.setClientPasscode(yourRabbitmqCloudClientPasscode)
.setSystemLogin(yourRabbitmqCloudSystemLogin)
.setSystemPasscode(yourRabbitmqCloudSystemPasscode)
// broadcast msg to every pod
.setUserDestinationBroadcast("/topic/unresolved-user-destination")
.setUserRegistryBroadcasr("/topic/user-registry")
// enables ssl/stomp protocol
.setTcpClient(tcpClient);