订阅带有 projectreactor 阻塞的 Flux Websocket 入站连接?
Subscribe to Flux Websocket inbound connection with projectreactor blocking?
在下面的代码中,IntelliJ 警告订阅不应在阻塞范围内调用。不幸的是,订阅似乎是将消费者与入站消息流相关联的最直观方式,是否有更好的方式?
Kotlin 中的代码片段,基于 projectreactor documentation 中的示例 Java 代码。
我想通过注入的消费者订阅入站消息,或者以其他消费者可以访问和订阅它的方式公开入站消息流量,我不希望这被阻止。
import io.netty.buffer.Unpooled
import io.netty.util.CharsetUtil
import reactor.core.publisher.Flux
import reactor.netty.http.client.HttpClient
fun main() {
HttpClient.create()
.websocket()
.uri("wss://echo.websocket.org")
.handle { inbound, outbound ->
inbound.receive()
.asString()
.take(1)
.subscribe(
{ println(it) },
{ println("error $it") },
{ println("completed") }
)
val msgBytes = "hello".toByteArray(CharsetUtil.ISO_8859_1)
outbound.send(Flux.just(Unpooled.wrappedBuffer(msgBytes))).neverComplete()
}
.blockLast()
}
我们找到了一种非阻塞的订阅替代方法。 then
和 zip
。 Kotlin 中的示例。
import io.netty.buffer.Unpooled
import io.netty.util.CharsetUtil.UTF_8
import reactor.core.publisher.Flux
import reactor.netty.http.client.HttpClient
fun main() {
val outgoingMessagesFlux = Flux.just(Unpooled.wrappedBuffer("hello".toByteArray(UTF_8)))
HttpClient.create()
.websocket()
.uri("wss://echo.websocket.org")
.handle { inbound, outbound ->
val thenInbound = inbound.receive()
.asString()
.doOnNext { println(it) }
.then()
val thenOutbound = outbound.send(outgoingMessagesFlux).neverComplete()
Flux.zip(thenInbound, thenOutbound).then()
}.blockLast()
}
这是基于 Spring WebFlux Netty websocket client source code implementation and the current spring-framework documentation。
在下面的代码中,IntelliJ 警告订阅不应在阻塞范围内调用。不幸的是,订阅似乎是将消费者与入站消息流相关联的最直观方式,是否有更好的方式?
Kotlin 中的代码片段,基于 projectreactor documentation 中的示例 Java 代码。
我想通过注入的消费者订阅入站消息,或者以其他消费者可以访问和订阅它的方式公开入站消息流量,我不希望这被阻止。
import io.netty.buffer.Unpooled
import io.netty.util.CharsetUtil
import reactor.core.publisher.Flux
import reactor.netty.http.client.HttpClient
fun main() {
HttpClient.create()
.websocket()
.uri("wss://echo.websocket.org")
.handle { inbound, outbound ->
inbound.receive()
.asString()
.take(1)
.subscribe(
{ println(it) },
{ println("error $it") },
{ println("completed") }
)
val msgBytes = "hello".toByteArray(CharsetUtil.ISO_8859_1)
outbound.send(Flux.just(Unpooled.wrappedBuffer(msgBytes))).neverComplete()
}
.blockLast()
}
我们找到了一种非阻塞的订阅替代方法。 then
和 zip
。 Kotlin 中的示例。
import io.netty.buffer.Unpooled
import io.netty.util.CharsetUtil.UTF_8
import reactor.core.publisher.Flux
import reactor.netty.http.client.HttpClient
fun main() {
val outgoingMessagesFlux = Flux.just(Unpooled.wrappedBuffer("hello".toByteArray(UTF_8)))
HttpClient.create()
.websocket()
.uri("wss://echo.websocket.org")
.handle { inbound, outbound ->
val thenInbound = inbound.receive()
.asString()
.doOnNext { println(it) }
.then()
val thenOutbound = outbound.send(outgoingMessagesFlux).neverComplete()
Flux.zip(thenInbound, thenOutbound).then()
}.blockLast()
}
这是基于 Spring WebFlux Netty websocket client source code implementation and the current spring-framework documentation。