如何从 Spring WebFlux 中的多个 Flux (WebsocketSession::receive) 正确地将值发送到 Sink?
How to correctly emit values to Sink from multiple Fluxes (WebsocketSession::receive) in Spring WebFlux?
在我的简化案例中,我想将 WebSocket 客户端发送的消息广播到所有其他客户端。该应用程序是使用具有 Spring.
的反应式 websockets 构建的
我的想法是使用单
Sink
如果从客户端收到消息,则将其发送到此接收器。 WebsocketSession::send
仅将此 Sink
发出的事件转发给连接的客户端。
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })
return Mono.zip(input, output).then()
}
fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)
fun <T> fromJson(json : String, clazz : Class<T>) : T{
return objectMapper.readValue(json, clazz)
}
}
此实现不安全,因为 Sink.emitNext
可以从不同线程调用。
我的尝试是使用 publishOn
并传递单线程 Scheduler
,以便从单个线程调用所有 WebSocketSession
的 onNext
。然而
这是行不通的。一个项目从 websocket 客户端发出,然后所有后续的 websocket 客户端在连接后立即收到 onClose 事件:
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val scheduler = Schedulers.newSingle("sink-scheduler")
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.publishOn(scheduler) // publish on single threaded scheduler
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
...
}
}
我能看到的另一个选项是 synchronize
在一些公共锁上,这样发射是线程安全的:
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val lock = Any()
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
synchronized(lock) {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
}
.then()
...
}
}
但是我不确定是否应该那样做。
问题是
在这种情况下是否可以使用 publishOn
以便发射是线程安全的,如果不是,还有什么是解决这个问题的其他方法(除了像我用 synchronized
关键字所做的那样使用同步).
而不是使用 synchronized
选项的悲观锁定,您可以创建一个 EmitFailureHandler
与 FAIL_FAST
相当,除了 returns true
for [=14] =].
这将导致立即重试并发发射尝试,就像在繁忙的循环中一样。
乐观地说,这最终会成功。如果您想对无限循环进行额外防御,您甚至可以让自定义处理程序引入延迟或限制它的次数 returns true
。
publishOn 单线程调度程序方法应该可行,但您需要为每个 ReactiveWebSocketHandler
.
使用相同的调度程序实例
您是否可以使用 flatMap 而不是使用 Sink 组合所有 receive() Flux?
我自己对这个问题的解决方案采用了繁忙的旋转方法 。
查看我对 similar question 的回答。
除了 @simon-baslé 的回答之外,这里还有示例代码(用于 srping-webflux)。它会将请求下游到订阅者,如果 Sinks.EmitResult.FAIL_NON_SERIALIZED
响应将重试。
这是 Sinks.EmitFailureHandler
定义:
private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult
.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;
这是将处理请求的控制器:
@org.springframework.web.bind.annotation.RestController
public class RestController {
private final Many<String> sink = Sinks.many().multicast().directBestEffort();
private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult
.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;
@Autowired
public RestController(ServiceSubscriber serviceSubscriber) {
sink.asFlux().subscribe(serviceSubscriber);
}
@GetMapping(path = "/{id}")
public Mono<ResponseEntity<Void>> getData(@PathVariable String id) {
return Mono.fromCallable(() -> {
sink.emitNext(id, emitFailureHandler);
return ResponseEntity.ok().<Void>build();
});
}
}
在我的简化案例中,我想将 WebSocket 客户端发送的消息广播到所有其他客户端。该应用程序是使用具有 Spring.
的反应式 websockets 构建的我的想法是使用单
Sink
如果从客户端收到消息,则将其发送到此接收器。 WebsocketSession::send
仅将此 Sink
发出的事件转发给连接的客户端。
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })
return Mono.zip(input, output).then()
}
fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)
fun <T> fromJson(json : String, clazz : Class<T>) : T{
return objectMapper.readValue(json, clazz)
}
}
此实现不安全,因为 Sink.emitNext
可以从不同线程调用。
我的尝试是使用 publishOn
并传递单线程 Scheduler
,以便从单个线程调用所有 WebSocketSession
的 onNext
。然而
这是行不通的。一个项目从 websocket 客户端发出,然后所有后续的 websocket 客户端在连接后立即收到 onClose 事件:
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val scheduler = Schedulers.newSingle("sink-scheduler")
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.publishOn(scheduler) // publish on single threaded scheduler
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
...
}
}
我能看到的另一个选项是 synchronize
在一些公共锁上,这样发射是线程安全的:
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val lock = Any()
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
synchronized(lock) {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
}
.then()
...
}
}
但是我不确定是否应该那样做。
问题是
在这种情况下是否可以使用 publishOn
以便发射是线程安全的,如果不是,还有什么是解决这个问题的其他方法(除了像我用 synchronized
关键字所做的那样使用同步).
而不是使用 synchronized
选项的悲观锁定,您可以创建一个 EmitFailureHandler
与 FAIL_FAST
相当,除了 returns true
for [=14] =].
这将导致立即重试并发发射尝试,就像在繁忙的循环中一样。
乐观地说,这最终会成功。如果您想对无限循环进行额外防御,您甚至可以让自定义处理程序引入延迟或限制它的次数 returns true
。
publishOn 单线程调度程序方法应该可行,但您需要为每个 ReactiveWebSocketHandler
.
您是否可以使用 flatMap 而不是使用 Sink 组合所有 receive() Flux?
我自己对这个问题的解决方案采用了繁忙的旋转方法
查看我对 similar question 的回答。
除了 @simon-baslé 的回答之外,这里还有示例代码(用于 srping-webflux)。它会将请求下游到订阅者,如果 Sinks.EmitResult.FAIL_NON_SERIALIZED
响应将重试。
这是 Sinks.EmitFailureHandler
定义:
private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult
.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;
这是将处理请求的控制器:
@org.springframework.web.bind.annotation.RestController
public class RestController {
private final Many<String> sink = Sinks.many().multicast().directBestEffort();
private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult
.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;
@Autowired
public RestController(ServiceSubscriber serviceSubscriber) {
sink.asFlux().subscribe(serviceSubscriber);
}
@GetMapping(path = "/{id}")
public Mono<ResponseEntity<Void>> getData(@PathVariable String id) {
return Mono.fromCallable(() -> {
sink.emitNext(id, emitFailureHandler);
return ResponseEntity.ok().<Void>build();
});
}
}