Reactor Netty - 如何使用延迟的 Flux 发送
Reactor Netty - how to send with delayed Flux
在 Reactor Netty 中,当通过 out.send(publisher)
向 TCP 通道发送数据时,我们希望任何发布者都能工作。但是,如果我们使用带有延迟元素的更复杂的立即数 Flux
而不是简单的立即数,那么它将停止正常工作。
例如,如果我们使用这个 hello world TCP 回显服务器,它会按预期工作:
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.time.Duration;
public class Reactor1 {
public static void main(String[] args) throws Exception {
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();
}
}
但是,如果我们将 out.sendString
更改为
out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))
那么我们预计对于每个接收到的项目,都会产生一个延迟一秒的输出。
但是,服务器的行为方式是,如果它在间隔期间接收到多个项目,它将只为第一个项目产生输出。例如,下面我们在第一秒输入 aa
和 bb
,但只有 AA
作为输出(一秒后)产生:
$ nc localhost 3344
aa
bb
AA <after one second>
然后,如果我们稍后输入额外的行,我们会得到输出(一秒后),但是来自之前的输入:
cc
BB <after one second>
有什么想法可以使 send()
在延迟 Flux
的情况下按预期工作吗?
我认为您不应该为 out.sendString(...)
重新创建发布者
这有效:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(in.receive()
.asString()
.map(String::toUpperCase)
.delayElements(Duration.ofSeconds(1))))
.bind()
.block();
server.channel().closeFuture().sync();
尝试使用concatMap。这有效:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.concatMap(s ->
out.sendString(Flux.just(s.toUpperCase())
.delayElements(Duration.ofSeconds(1)))
))
.bind()
.block();
server.channel().closeFuture().sync();
延迟传入流量
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.timestamp()
.delayElements(Duration.ofSeconds(1))
.concatMap(tuple2 ->
out.sendString(
Flux.just(tuple2.getT2().toUpperCase() +
" " +
(System.currentTimeMillis() - tuple2.getT1())
))
))
.bind()
.block();
在 Reactor Netty 中,当通过 out.send(publisher)
向 TCP 通道发送数据时,我们希望任何发布者都能工作。但是,如果我们使用带有延迟元素的更复杂的立即数 Flux
而不是简单的立即数,那么它将停止正常工作。
例如,如果我们使用这个 hello world TCP 回显服务器,它会按预期工作:
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.time.Duration;
public class Reactor1 {
public static void main(String[] args) throws Exception {
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();
}
}
但是,如果我们将 out.sendString
更改为
out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))
那么我们预计对于每个接收到的项目,都会产生一个延迟一秒的输出。
但是,服务器的行为方式是,如果它在间隔期间接收到多个项目,它将只为第一个项目产生输出。例如,下面我们在第一秒输入 aa
和 bb
,但只有 AA
作为输出(一秒后)产生:
$ nc localhost 3344
aa
bb
AA <after one second>
然后,如果我们稍后输入额外的行,我们会得到输出(一秒后),但是来自之前的输入:
cc
BB <after one second>
有什么想法可以使 send()
在延迟 Flux
的情况下按预期工作吗?
我认为您不应该为 out.sendString(...)
重新创建发布者
这有效:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(in.receive()
.asString()
.map(String::toUpperCase)
.delayElements(Duration.ofSeconds(1))))
.bind()
.block();
server.channel().closeFuture().sync();
尝试使用concatMap。这有效:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.concatMap(s ->
out.sendString(Flux.just(s.toUpperCase())
.delayElements(Duration.ofSeconds(1)))
))
.bind()
.block();
server.channel().closeFuture().sync();
延迟传入流量
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.timestamp()
.delayElements(Duration.ofSeconds(1))
.concatMap(tuple2 ->
out.sendString(
Flux.just(tuple2.getT2().toUpperCase() +
" " +
(System.currentTimeMillis() - tuple2.getT1())
))
))
.bind()
.block();