订阅带有 projectreactor 阻塞的 Flux Websocket 入站连接?

Subscribe to Flux Websocket inbound connection with projectreactor blocking?

在下面的代码中,IntelliJ 警告订阅不应在阻塞范围内调用。不幸的是,订阅似乎是将消费者与入站消息流相关联的最直观方式,是否有更好的方式?

Kotlin 中的代码片段,基于 projectreactor documentation 中的示例 Java 代码。

我想通过注入的消费者订阅入站消息,或者以其他消费者可以访问和订阅它的方式公开入站消息流量,我不希望这被阻止。

import io.netty.buffer.Unpooled
import io.netty.util.CharsetUtil
import reactor.core.publisher.Flux
import reactor.netty.http.client.HttpClient

fun main() {
    HttpClient.create()
        .websocket()
        .uri("wss://echo.websocket.org")
        .handle { inbound, outbound ->
            inbound.receive()
                .asString()
                .take(1)
                .subscribe(
                    { println(it) },
                    { println("error $it") },
                    { println("completed") }
                )

            val msgBytes = "hello".toByteArray(CharsetUtil.ISO_8859_1)
            outbound.send(Flux.just(Unpooled.wrappedBuffer(msgBytes))).neverComplete()
        }
        .blockLast()
}

我们找到了一种非阻塞的订阅替代方法。 thenzip。 Kotlin 中的示例。

import io.netty.buffer.Unpooled
import io.netty.util.CharsetUtil.UTF_8
import reactor.core.publisher.Flux
import reactor.netty.http.client.HttpClient

fun main() {
    val outgoingMessagesFlux = Flux.just(Unpooled.wrappedBuffer("hello".toByteArray(UTF_8)))

    HttpClient.create()
        .websocket()
        .uri("wss://echo.websocket.org")
        .handle { inbound, outbound ->
            val thenInbound = inbound.receive()
                .asString()
                .doOnNext { println(it) }
                .then()

            val thenOutbound = outbound.send(outgoingMessagesFlux).neverComplete()
            Flux.zip(thenInbound, thenOutbound).then()
        }.blockLast()
}

这是基于 Spring WebFlux Netty websocket client source code implementation and the current spring-framework documentation