如何确保 Reactive Stream 使用 Spring WebFlux 和 WebSockets 完成

How to ensure Reactive Stream completes with Spring WebFlux and WebSockets

我已经用 Kotlin 为 Spring WebFlux 编写了一个测试客户端和服务器。客户端向服务器发送一个数字(例如 4)并返回那么多数字(例如 0、1、2 和 3)。这是服务器实现:

class NumbersWebSocketHandler : WebSocketHandler {
    override fun handle(session: WebSocketSession): Mono<Void> {
        var index = 0
        var count = 1
        val publisher = Flux.generate<Int> { sink ->
            if (index < count) {
                sink.next(index)
                index++
            } else {
                sink.complete()
            }
        }.map(Int::toString)
            .map(session::textMessage)
            .delayElements(Duration.ofMillis(500))

        return session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext {
                println("About to send $it numbers")
                count = it.toInt()
            }
            .then()
            .and(session.send(publisher))
            .then()
    }
}

这是客户:

fun main(args: Array<String>) {
    val uri = URI("ws://localhost:8080/numbers")
    val client = ReactorNettyWebSocketClient()

    println("How many numbers would you like?")
    val input = Flux.just(readLine())

    client.execute(uri) { session ->
        session.send(input.map(session::textMessage))
            .then(
                session.receive()
                    .map(WebSocketMessage::getPayloadAsText)
                    .map { it.toInt() }
                    .reduce { a,b ->
                        println("Reduce called with $a and $b")
                        a + b
                    }
                    .doOnNext(::println)
                    .then()
            )
            .then()
    }.block()
}

客户端成功接收号码并调用reduce如下:

Reduce called with 0 and 1

Reduce called with 1 and 2

Reduce called with 3 and 3

然而,对 doOnNext 的调用从未到达 - 可能是因为客户端不知道最后一项已发送。我的问题是我需要在客户端或服务器上添加什么代码才能打印总数?

更新: 在服务器端关闭会话没有帮助。我试过:

.delayElements(Duration.ofMillis(500))
.doOnComplete { session.close() }

还有:

.delayElements(Duration.ofMillis(500))
.doFinally { session.close() }

但对客户端的行为没有任何影响。在调用 'send':

之后,也不会尝试显式关闭会话
.and(session.send(publisher))
.then()
.and { session.close() }
.then()

marble diagram for reduce可以看出,它期望上游发布者发送一个onComplete事件来自己发出一个事件。我不确定,但我认为只有在连接正常终止时才会发出。这就是 doOnNext 永远不会被执行的原因。

而不是 reduce 我认为你应该使用 subscribe() 来代替,并定义你自己的 Subscriber 实例,你可以在其中保持状态。

编辑: 你不能在这里使用 subscribe 方法。虽然如果服务器上的连接关闭它应该像这样工作:

.map(Int::toString)
        .map(session::textMessage)
        .delayElements(Duration.ofMillis(500))
        .then(session::close)

WebSocketHandler 返回的 Mono<Void> 指示处理何时完成,这反过来指示 WebSocket 连接应保持打开状态多长时间。问题是双方返回的 Mono<Void> 永远不会完成。

客户端使用reduce等待服务器端输入结束,而服务器使用receive()+doOnNext+then()等待接收永远的消息。所以客户端和服务器都互相等待。在客户端添加take有助于打破僵局:

client.execute(uri, session -> session
        .send(input.map(session::textMessage))
        .then(session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(Integer::valueOf)
                .take(3)
                .reduce((a,b) -> {
                    logger.debug("Reduce called with " + a + " and " + b);
                    return a + b;
                })
                .doOnNext(logger::debug)
        )
        .then()
).block();

第二个问题是服务器配置不正确。发送与通过 .and 接收并行触发。相反,发送流应该首先依赖于接收计数:

session.receive()
        .map(WebSocketMessage::getPayloadAsText)
        .flatMap(s -> {
            logger.debug("About to send " + s + " numbers");
            count.set(Integer.parseInt(s));
            return session.send(publisher);
        })
        .then()