如何确保 Reactive Stream 使用 Spring WebFlux 和 WebSockets 完成
How to ensure Reactive Stream completes with Spring WebFlux and WebSockets
我已经用 Kotlin 为 Spring WebFlux 编写了一个测试客户端和服务器。客户端向服务器发送一个数字(例如 4)并返回那么多数字(例如 0、1、2 和 3)。这是服务器实现:
class NumbersWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
var index = 0
var count = 1
val publisher = Flux.generate<Int> { sink ->
if (index < count) {
sink.next(index)
index++
} else {
sink.complete()
}
}.map(Int::toString)
.map(session::textMessage)
.delayElements(Duration.ofMillis(500))
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext {
println("About to send $it numbers")
count = it.toInt()
}
.then()
.and(session.send(publisher))
.then()
}
}
这是客户:
fun main(args: Array<String>) {
val uri = URI("ws://localhost:8080/numbers")
val client = ReactorNettyWebSocketClient()
println("How many numbers would you like?")
val input = Flux.just(readLine())
client.execute(uri) { session ->
session.send(input.map(session::textMessage))
.then(
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map { it.toInt() }
.reduce { a,b ->
println("Reduce called with $a and $b")
a + b
}
.doOnNext(::println)
.then()
)
.then()
}.block()
}
客户端成功接收号码并调用reduce如下:
Reduce called with 0 and 1
Reduce called with 1 and 2
Reduce called with 3 and 3
然而,对 doOnNext 的调用从未到达 - 可能是因为客户端不知道最后一项已发送。我的问题是我需要在客户端或服务器上添加什么代码才能打印总数?
更新: 在服务器端关闭会话没有帮助。我试过:
.delayElements(Duration.ofMillis(500))
.doOnComplete { session.close() }
还有:
.delayElements(Duration.ofMillis(500))
.doFinally { session.close() }
但对客户端的行为没有任何影响。在调用 'send':
之后,也不会尝试显式关闭会话
.and(session.send(publisher))
.then()
.and { session.close() }
.then()
从marble diagram for reduce
可以看出,它期望上游发布者发送一个onComplete
事件来自己发出一个事件。我不确定,但我认为只有在连接正常终止时才会发出。这就是 doOnNext
永远不会被执行的原因。
而不是 reduce
我认为你应该使用 subscribe()
来代替,并定义你自己的 Subscriber
实例,你可以在其中保持状态。
编辑: 你不能在这里使用 subscribe
方法。虽然如果服务器上的连接关闭它应该像这样工作:
.map(Int::toString)
.map(session::textMessage)
.delayElements(Duration.ofMillis(500))
.then(session::close)
从 WebSocketHandler
返回的 Mono<Void>
指示处理何时完成,这反过来指示 WebSocket 连接应保持打开状态多长时间。问题是双方返回的 Mono<Void>
永远不会完成。
客户端使用reduce
等待服务器端输入结束,而服务器使用receive()
+doOnNext
+then()
等待接收永远的消息。所以客户端和服务器都互相等待。在客户端添加take
有助于打破僵局:
client.execute(uri, session -> session
.send(input.map(session::textMessage))
.then(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(Integer::valueOf)
.take(3)
.reduce((a,b) -> {
logger.debug("Reduce called with " + a + " and " + b);
return a + b;
})
.doOnNext(logger::debug)
)
.then()
).block();
第二个问题是服务器配置不正确。发送与通过 .and
接收并行触发。相反,发送流应该首先依赖于接收计数:
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(s -> {
logger.debug("About to send " + s + " numbers");
count.set(Integer.parseInt(s));
return session.send(publisher);
})
.then()
我已经用 Kotlin 为 Spring WebFlux 编写了一个测试客户端和服务器。客户端向服务器发送一个数字(例如 4)并返回那么多数字(例如 0、1、2 和 3)。这是服务器实现:
class NumbersWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
var index = 0
var count = 1
val publisher = Flux.generate<Int> { sink ->
if (index < count) {
sink.next(index)
index++
} else {
sink.complete()
}
}.map(Int::toString)
.map(session::textMessage)
.delayElements(Duration.ofMillis(500))
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext {
println("About to send $it numbers")
count = it.toInt()
}
.then()
.and(session.send(publisher))
.then()
}
}
这是客户:
fun main(args: Array<String>) {
val uri = URI("ws://localhost:8080/numbers")
val client = ReactorNettyWebSocketClient()
println("How many numbers would you like?")
val input = Flux.just(readLine())
client.execute(uri) { session ->
session.send(input.map(session::textMessage))
.then(
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map { it.toInt() }
.reduce { a,b ->
println("Reduce called with $a and $b")
a + b
}
.doOnNext(::println)
.then()
)
.then()
}.block()
}
客户端成功接收号码并调用reduce如下:
Reduce called with 0 and 1
Reduce called with 1 and 2
Reduce called with 3 and 3
然而,对 doOnNext 的调用从未到达 - 可能是因为客户端不知道最后一项已发送。我的问题是我需要在客户端或服务器上添加什么代码才能打印总数?
更新: 在服务器端关闭会话没有帮助。我试过:
.delayElements(Duration.ofMillis(500))
.doOnComplete { session.close() }
还有:
.delayElements(Duration.ofMillis(500))
.doFinally { session.close() }
但对客户端的行为没有任何影响。在调用 'send':
之后,也不会尝试显式关闭会话.and(session.send(publisher))
.then()
.and { session.close() }
.then()
从marble diagram for reduce
可以看出,它期望上游发布者发送一个onComplete
事件来自己发出一个事件。我不确定,但我认为只有在连接正常终止时才会发出。这就是 doOnNext
永远不会被执行的原因。
而不是 reduce
我认为你应该使用 subscribe()
来代替,并定义你自己的 Subscriber
实例,你可以在其中保持状态。
编辑: 你不能在这里使用 subscribe
方法。虽然如果服务器上的连接关闭它应该像这样工作:
.map(Int::toString)
.map(session::textMessage)
.delayElements(Duration.ofMillis(500))
.then(session::close)
从 WebSocketHandler
返回的 Mono<Void>
指示处理何时完成,这反过来指示 WebSocket 连接应保持打开状态多长时间。问题是双方返回的 Mono<Void>
永远不会完成。
客户端使用reduce
等待服务器端输入结束,而服务器使用receive()
+doOnNext
+then()
等待接收永远的消息。所以客户端和服务器都互相等待。在客户端添加take
有助于打破僵局:
client.execute(uri, session -> session
.send(input.map(session::textMessage))
.then(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(Integer::valueOf)
.take(3)
.reduce((a,b) -> {
logger.debug("Reduce called with " + a + " and " + b);
return a + b;
})
.doOnNext(logger::debug)
)
.then()
).block();
第二个问题是服务器配置不正确。发送与通过 .and
接收并行触发。相反,发送流应该首先依赖于接收计数:
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(s -> {
logger.debug("About to send " + s + " numbers");
count.set(Integer.parseInt(s));
return session.send(publisher);
})
.then()