Spring 集成和响应式 WebSocket

Spring Integration and Reactive WebSockets

Spring 集成提供 non-reactive inbound/outbound WebSocket 适配器,简单地说,通过内部容器将 session 与 id 相关联,你对消息进行一些处理,然后出站,它检查消息 headers 的 session id,并通过 session.

发送它

现在,Spring 通过 org.springframework.web.reactive.socket.WebSocketSession 和其他 类 提供反应式 WebSocket 支持,我想知道在反应式 WebSocket 堆栈的通道适配器方面是否有类似的支持.

如果没有,是否有任何共同点 patterns/practices,如何将反应式 WS 与 spring-integration 消息流集成?

此功能尚未调用,因此我们尚未考虑此事。

拜托,看看我的SandBox。这是我能根据目前的情况提出的最好建议。

我们只是遵循标准 Spring WebFlux 建议来实现 WebSockets 解决方案。因此,我们有一个带有适当 URL 映射的 WebSocketHandler 实现。该实现只是将 Fluxsession.receive() 转发到动态注册的 IntegrationFlow 中。流程随后转换为用于 session.send().

的 Reactive Publisher

我相信可以使用许多其他方法,例如使用来自此 handle(WebSocketSession) 实现的 FluxMessageChannel bean 及其 subscribeTo()Flux 桥接到预定义的集成流中。或者来自 doOnNext().

的简单 @MessagingGateway 调用

虽然不确定 session.send() 是否可以在下游独立使用(需要播放),但您可以在示例中看到我如何将 WebSocketSession 传播到 MessageHeaders授予它在集成流程中的访问权限。

感谢您的见解,Artem,他们提供了很多帮助。

这是我最后做的事情:

从处理程序,我只是将收到的消息发送到我的频道接受它们:

public class FakeWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        session.receive()
            .map(wsm -> MessageBuilder.withPayload(wsm).build())
            .subscribe(subscriberChannel::send);

        return Mono.never();
    }
}

在下游,在我的流程结束时,在我最终的服务激活器中,我发送响应:

public class FakeResponder extends AbstractMessageHandler {
    @Override
    protected void handleMessageInternal(Message<?> message) {
        final WebSocketSession session = ...; // obtain WebSocketSession for this message

        session.send(Mono.just(message.getPayload())
            .map(this::convertToSomeByteRepresentation)
            .map(bb -> session.binaryMessage(dbf -> dbf.wrap(bb)))
        ).subscribe();
    }
}