如何从 Spring WebFlux 中的多个 Flux (WebsocketSession::receive) 正确地将值发送到 Sink?

How to correctly emit values to Sink from multiple Fluxes (WebsocketSession::receive) in Spring WebFlux?

在我的简化案例中,我想将 WebSocket 客户端发送的消息广播到所有其他客户端。该应用程序是使用具有 Spring.

的反应式 websockets 构建的

我的想法是使用单 Sink 如果从客户端收到消息,则将其发送到此接收器。 WebsocketSession::send 仅将此 Sink 发出的事件转发给连接的客户端。

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })

        return Mono.zip(input, output).then()
    }

    fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)

    fun <T> fromJson(json : String, clazz : Class<T>) : T{
        return objectMapper.readValue(json, clazz)
    }

}

此实现不安全,因为 Sink.emitNext 可以从不同线程调用。

我的尝试是使用 publishOn 并传递单线程 Scheduler,以便从单个线程调用所有 WebSocketSessiononNext。然而 这是行不通的。一个项目从 websocket 客户端发出,然后所有后续的 websocket 客户端在连接后立即收到 onClose 事件:

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val scheduler = Schedulers.newSingle("sink-scheduler")

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .publishOn(scheduler) // publish on single threaded scheduler
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        ...
    }

}

我能看到的另一个选项是 synchronize 在一些公共锁上,这样发射是线程安全的:

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val lock = Any()

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    synchronized(lock) {
                        sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                    }
                }
                .then()
        ...
    }


}

但是我不确定是否应该那样做。

问题是

在这种情况下是否可以使用 publishOn 以便发射是线程安全的,如果不是,还有什么是解决这个问题的其他方法(除了像我用 synchronized 关键字所做的那样使用同步).

而不是使用 synchronized 选项的悲观锁定,您可以创建一个 EmitFailureHandlerFAIL_FAST 相当,除了 returns true for [=14] =].

这将导致立即重试并发发射尝试,就像在繁忙的循环中一样。

乐观地说,这最终会成功。如果您想对无限循环进行额外防御,您甚至可以让自定义处理程序引入延迟或限制它的次数 returns true

publishOn 单线程调度程序方法应该可行,但您需要为每个 ReactiveWebSocketHandler.

使用相同的调度程序实例

您是否可以使用 flatMap 而不是使用 Sink 组合所有 receive() Flux?

我自己对这个问题的解决方案采用了繁忙的旋转方法

查看我对 similar question 的回答。

除了 @simon-baslé 的回答之外,这里还有示例代码(用于 srping-webflux)。它会将请求下游到订阅者,如果 Sinks.EmitResult.FAIL_NON_SERIALIZED 响应将重试。 这是 Sinks.EmitFailureHandler 定义:

private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult
            .equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;

这是将处理请求的控制器:

@org.springframework.web.bind.annotation.RestController
public class RestController {

    private final Many<String> sink = Sinks.many().multicast().directBestEffort();
    private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult
            .equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;
    @Autowired
    public RestController(ServiceSubscriber serviceSubscriber) {
        sink.asFlux().subscribe(serviceSubscriber);
    }

    @GetMapping(path = "/{id}")
    public Mono<ResponseEntity<Void>> getData(@PathVariable String id) {
        return Mono.fromCallable(() -> {
            sink.emitNext(id, emitFailureHandler);
            return ResponseEntity.ok().<Void>build();
        });
    }
}