RSocket channel error : "reactor.core.publisher.Operators.error - Operator called default onErrorDropped" with merged flux

RSocket channel error : "reactor.core.publisher.Operators.error - Operator called default onErrorDropped" with merged flux

我想创建一个 rsocket 通道,从服务器发送的数据可以是对客户端请求的反应或推送。我为此使用了通量合并。

参考数据:客户端可以请求刷新,服务端也可以推送更新。

所以我在服务器端有这个:

    @MessageMapping("update-stream")
    Flux<DomainObject> addUpdatesListener(Flux<RefreshRequest> requests) {
        Flux<DomainObject> pushFlux = Flux.from(this.flux)
            .doOnError((e) -> log.error("Error on push flux : {}", e, e));
        return requests
                .map(this::getUpdates)
                .flatMap(Flux::fromIterable)
                .doOnError((e) -> log.error("Error on channel flux : {}", e, e))
                .mergeWith(pushFlux)
                .doOnError((e) -> log.error("Error on merged flux : {}", e, e));
    }

除了当我停止客户端时出现以下错误外,它工作正常:

06-07-2020 15:58:53.168 [reactor-http-nio-3] ERROR reactor.core.publisher.Operators.error - Operator called default onErrorDropped
java.util.concurrent.CancellationException: Disposed
    at reactor.core.publisher.FluxProcessor.dispose(FluxProcessor.java:80)
    at io.rsocket.core.RSocketResponder.hookOnCancel(RSocketResponder.java:513)
    at reactor.core.publisher.BaseSubscriber.cancel(BaseSubscriber.java:230)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at io.rsocket.core.RSocketResponder.cleanUpSendingSubscriptions(RSocketResponder.java:275)
    at io.rsocket.core.RSocketResponder.cleanup(RSocketResponder.java:265)
    at io.rsocket.core.RSocketResponder.tryTerminate(RSocketResponder.java:167)
    at io.rsocket.core.RSocketResponder.tryTerminateOnConnectionClose(RSocketResponder.java:160)
    at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:132)
    at reactor.core.publisher.MonoProcessor$NextInner.onComplete(MonoProcessor.java:518)
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:308)
    at reactor.core.publisher.MonoProcessor.onComplete(MonoProcessor.java:265)
    at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:23)
    at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new[=11=](TcpDuplexConnection.java:60)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
    at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
    at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1158)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:760)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)  

如果我不进行合并,我没有错误。

我尝试了很多不同的版本,但我找不到一种方法既能推送又能在客户端退出时记录无错误。

我错过了什么?

非常感谢。

从spring-boot 2.3.0.RELEASE升级到2.3.1.RELEASE.

问题消失