在 Spring Webflux "works" 中实现,但我试图理解 "why?"

Implementation in Spring Webflux "works", but I'm trying to understand "why?"

"You are thinking imperative, that first row will be executed then the second which is not the case in webflux. You have to think events-callbacks."

我同意这个评估(我有很多做事的经验"imperatively"),并希望人们可以帮助我对我如何看待解决方案进行方向修正-space.我发布了相同 "functionality" 的三个不同版本,其中只有一个有效(并且,我愿意就如何修改该版本 could/should 以更好地与 reactive/functional 实现保持一致发表评论).

在进行引用评估的人的 direction/help 的帮助下,我能够使 `DemoPOJOHandler.add(ServerRequest)' 正常工作。代码以及调试级别的输出如下所示。我注意到,在 HTTP POST "/v2/DemoPOJO" 和 *"Mapped to mil.navy..." 行之后,有一个条目来自 reactor.netty.channel.FluxReceive 声明“正在订阅入站接收方..”。这似乎是我其他两次尝试中缺少的关键操作。

我的具体(虽然“有点长”)问题是:

我 "get" 认为语句 #1 将被执行,然后语句 #2 等是解决方案 space 的 "imperative" 视图。但是,在下面的示例中,这似乎是正在发生的行为。 logger 语句在 08:38:34.217 处执行,随后在 08:38:34.251 处执行订阅,然后在 08:39:34.267 处实例化 DemoPOJO ,然后是所有 "works".

但是,request.bodyToMono()... 序列中的链接与命令式代码中的方法链接(例如,'Integer.toString().indexOf()'),但 lambda 除外(或者,lambda 的存在是 "things change" 的原因吗?)。因此,如果 request.bodyToMono()... 序列,理论上,不需要“.then()”或“.switchIfEmpty()",那为什么核心request.bodyToMono()...序列不执行"service.add(demoPOJO)”?我知道 Mono 没有被订阅,但为什么似乎需要链中的附加语句才能进行订阅并将 POJO 添加到存储库?

这段代码执行成功...

@Component
public class DemoPOJOHandler {

    private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");

        return request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> service.add(demoPOJO))
                                                 .then(ServerResponse.ok().build())
                                                 .switchIfEmpty(ServerResponse.badRequest()
                                                                              .contentType(MediaType.APPLICATION_JSON)
                                                                              .build());
    }
}


2019-07-25 08:38:34.144 DEBUG 11992 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkAccessible: true
2019-07-25 08:38:34.145 DEBUG 11992 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkBounds: true
2019-07-25 08:38:34.145 DEBUG 11992 --- [ctor-http-nio-2] i.n.util.ResourceLeakDetectorFactory     : Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@7a8a4d6a
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [id: 0xa2da3d98, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62644] New http connection, requesting read
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] New http connection, requesting read
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.channel.BootstrapHandlers  : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-2] reactor.netty.channel.BootstrapHandlers  : [id: 0xa2da3d98, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62644] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxCapacityPerThread: 4096
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxSharedCapacityFactor: 2
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.linkCapacity: 16
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.ratio: 8
2019-07-25 08:38:34.173 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Increasing pending responses, now 1
2019-07-25 08:38:34.173 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Handler is being applied: org.springframework.http.server.reactive.ReactorHttpHandlerAdapter@579c20c6
2019-07-25 08:38:34.195 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5f552130] HTTP POST "/v2/DemoPOJO"
2019-07-25 08:38:34.217 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.r.f.s.s.RouterFunctionMapping      : [5f552130] Mapped to mil.navy.demo.DemoPOJO.DemoPOJORouter$$Lambda8/1123559518@22a8277c
2019-07-25 08:38:34.217 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOHandler   : DemoPOJOHandler.add( ServerRequest )
2019-07-25 08:38:34.251 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.channel.FluxReceive        : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.DemoPOJO( 666, foo_666, 10666 )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.toString()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] o.s.http.codec.json.Jackson2JsonDecoder  : [5f552130] Decoded [666 :: foo_666 :: 10666]
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOService   : DemoPOJOService.add( DemoPOJO )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJORepo      : DemoPOJORepo.add( DemoPOJO )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.getId()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.getId()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJORepo      : DemoPOJORepo.add( DemoPOJO ) -> adding for id 666
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO          : DemoPOJO.getId()
2019-07-25 08:38:34.272 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5f552130] Completed 200 OK
2019-07-25 08:38:34.273 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Last HTTP response frame
2019-07-25 08:38:34.273 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] No sendHeaders() called before complete, sending zero-length header
2019-07-25 08:38:34.274 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Decreasing pending responses, now 0
2019-07-25 08:38:34.275 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Last HTTP packet was sent, terminating the channel
2019-07-25 08:38:41.720 DEBUG 11992 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: (port 62610) connection closed
2019-07-25 08:38:41.720 DEBUG 11992 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: close connection

此代码执行 "without errors",但从不订阅,因此从不执行链的“doOnSuccess(...)”部分。但是,好像应该这样?什么是 "magic" 关于将单独的 return... 语句链接到 'request.bodyToMono(...)' 语句上“.然后(...)”?

@Component
public class DemoPOJOHandler {

    private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");

        request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()));
        return  ServerResponse.ok().build();
    }
}


2019-07-25 08:40:16.155 DEBUG 17064 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: (port 62661) connection closed
2019-07-25 08:40:16.155 DEBUG 17064 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(4)-169.254.211.161: close connection
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkAccessible: true
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf          : -Dio.netty.buffer.checkBounds: true
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] i.n.util.ResourceLeakDetectorFactory     : Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@3860465a
2019-07-25 08:40:18.266 DEBUG 17064 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [id: 0x768a1f21, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62695] New http connection, requesting read
2019-07-25 08:40:18.266 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] New http connection, requesting read
2019-07-25 08:40:18.267 DEBUG 17064 --- [ctor-http-nio-3] reactor.netty.channel.BootstrapHandlers  : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:40:18.267 DEBUG 17064 --- [ctor-http-nio-2] reactor.netty.channel.BootstrapHandlers  : [id: 0x768a1f21, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62695] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxCapacityPerThread: 4096
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.maxSharedCapacityFactor: 2
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.linkCapacity: 16
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler                   : -Dio.netty.recycler.ratio: 8
2019-07-25 08:40:18.285 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Increasing pending responses, now 1
2019-07-25 08:40:18.289 DEBUG 17064 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Handler is being applied: org.springframework.http.server.reactive.ReactorHttpHandlerAdapter@7fa4fcbc
2019-07-25 08:40:18.297 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [51900c31] HTTP POST "/v2/DemoPOJO"
2019-07-25 08:40:18.315 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.r.f.s.s.RouterFunctionMapping      : [51900c31] Mapped to mil.navy.demo.DemoPOJO.DemoPOJORouter$$Lambda2/1446001495@27a07cfc
2019-07-25 08:40:18.316 DEBUG 17064 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOHandler   : DemoPOJOHandler.add( ServerRequest )
2019-07-25 08:40:18.358 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [51900c31] Completed 200 OK
2019-07-25 08:40:18.359 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Last HTTP response frame
2019-07-25 08:40:18.359 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] No sendHeaders() called before complete, sending zero-length header
2019-07-25 08:40:18.360 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Decreasing pending responses, now 0
2019-07-25 08:40:18.361 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Last HTTP packet was sent, terminating the channel
2019-07-25 08:40:18.366 DEBUG 17064 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] No ChannelOperation attached. Dropping: 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 0a 20 20 20 20 22 69 64 22 3a 20 36 36 36 2c |{.    "id": 666,|
|00000010| 0a 20 20 20 20 22 6e 61 6d 65 22 3a 20 22 66 6f |.    "name": "fo|
|00000020| 6f 5f 36 36 36 22 2c 0a 20 20 20 20 22 76 61 6c |o_666",.    "val|
|00000030| 75 65 22 3a 20 31 30 36 36 36 0a 7d             |ue": 10666.}    |
+--------+-------------------------------------------------+----------------+

此代码简单地用 NPE 炸毁。我失败的逻辑是 "well, if "doOnSuccess(...)" 没有发生,因为 Mono 没有被订阅,然后 "subscribe"。显然,这不是解决方案。不太明显(对我来说,在这个时间点)是 "why?"。

@Component
public class DemoPOJOHandler {

    private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> add(ServerRequest request) {
        logger.debug("DemoPOJOHandler.add( ServerRequest )");

        request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()))
                                          .subscribe();
        return  ServerResponse.ok().build();
    }
}


reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
at mil.navy.demo.DemoPOJO.DemoPOJOHandler.lambda$add(DemoPOJOHandler.java:73) ~[classes/:na]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:311) [reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1743) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1743) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:155) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onComplete(FluxMapSignal.java:213) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:390) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:197) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:338) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:350) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:399) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.http.server.HttpServerOperations.cleanHandlerTerminate(HttpServerOperations.java:519) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.http.server.HttpTrafficHandler.operationComplete(HttpTrafficHandler.java:314) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]

    (... lots of stuff deleted to fit posting constraints ...)

2019-07-25 10:44:43.212 DEBUG 10544 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0xa62e89df, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:64515] No ChannelOperation attached. Dropping: 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 0a 20 20 20 20 22 69 64 22 3a 20 36 36 36 2c |{.    "id": 666,|
|00000010| 0a 20 20 20 20 22 6e 61 6d 65 22 3a 20 22 66 6f |.    "name": "fo|
|00000020| 6f 5f 36 36 36 22 2c 0a 20 20 20 20 22 76 61 6c |o_666",.    "val|
|00000030| 75 65 22 3a 20 31 30 36 36 36 0a 7d             |ue": 10666.}    |
+--------+-------------------------------------------------+----------------+
2019-07-25 10:45:08.112 DEBUG 10544 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(3)-169.254.211.161: (port 64485) connection closed
2019-07-25 10:45:08.112 DEBUG 10544 --- [169.254.211.161] sun.rmi.transport.tcp                    : RMI TCP Connection(3)-169.254.211.161: close connection

因为我是您在顶部引用的那个人,所以我将尝试回答您的问题。

首先我们需要谈谈"non-blocking"。什么是 "non-blocking"?非阻塞是基于事件的。底层服务器 Netty 不会为每个请求分配一个线程,而是使用事件链和事件队列。

所以当有人订阅时,netty 将设置一个基本的事件队列(某种程度上),它基本上可以作为:

x <- y <- z

要得到 x 我们需要解析 y,但是要得到 y 我们需要解析 z。这就是人们通常所说的此类编程中的 "functional" 部分。

当人们开始使用响应式编程时,我看到的最常见的错误之一是他们不理解 subscriber 是调用 client。您的 spring 应用程序是 publisher,每个 client 调用您的服务是 subscriber

您永远不应该在您的应用程序中订阅

为什么您的发布应用程序会订阅自己?当你以人们通常理解的方式解释它时。

所以让我们看一下您的示例,我将以相反的顺序进行处理:

示例 3:

public Mono<ServerResponse> add(ServerRequest request) {
    logger.debug("DemoPOJOHandler.add( ServerRequest )");

    request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()))
                                      .subscribe();
    return  ServerResponse.ok().build();
}

这里我们以命令的方式进入方法,我们给它请求,ServerRequest是一个具体的对象,但是一旦你这样做bodyToMono你就会return一个Mono<DemoPOJO> 这又是一个包装的 CompletableFuture 里面有一个计算(将请求中的正文放入你的 dto)

此计算完成后,Mono 将进入 success 状态并触发链中后面的内容,因此 doOnSuccess 将被触发。当 doOnSuccess 完成后它将 return Mono<Void>.

这就是你 doOnSuccess 完成后的问题所在,你 subscribe。所以你在这里做的是,只要有人向你的应用程序发布 ServerRequest Netty(服务器)就会建立一个事件链,在这个事件链中,应用程序将订阅它自己。

这里链是由应用订阅自身完成的。所以应用程序是它自己的客户端。

例二:

public Mono<ServerResponse> add(ServerRequest request) {
    logger.debug("DemoPOJOHandler.add( ServerRequest )");

    request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()));
    return  ServerResponse.ok().build();
}

这里我们做和例3一样的事情,只是当事件链被设置时,请求被映射到一个DTO,然后我们在doOnSuccess中做一些事情但是链条被打破了. doOnSuccess 表示它已完成,但之后没有任何内容正在收听。

所以这里的事件链是断的,是不完整的。在您订阅之前什么都不会发生,但是由于链条已断开,没有人可以 subscribe 因此什么也不会发生。

示例 1:

public Mono<ServerResponse> add(ServerRequest request) {
    logger.debug("DemoPOJOHandler.add( ServerRequest )");

    return request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> service.add(demoPOJO))
                                             .then(ServerResponse.ok().build())
                                             .switchIfEmpty(ServerResponse.badRequest()
                                                                          .contentType(MediaType.APPLICATION_JSON)
                                                                          .build());
}

至此链条完成。 Something is signaling something, that is signaling something, 当某事完成时,下一件事就会触发,然后下一个,下一个,再下一个。

调用客户端发布数据,服务器设置事件链,链完成以便客户端订阅客户端订阅,然后事件链启动并触发所有回调和 returns 数据。

Flux<T>Mono<T> 都是 monad CompletableFuture<T> 的包装器 类。 Optional<T>Stream<T> 也是单子,单子来自函数世界,就像编程语言 Haskell。了解它们如何工作的一个好方法是更多地了解 monad。

如果您想总体上了解更多关于 monad 的信息,我将无耻地插入我自己的文章:

Write a Monad, in Java, seriously?

我也很好读 Intro to reactive programming 我推荐阅读他们所有的例子。