io.netty.util.IllegalReferenceCountException: refCnt: 0, 减量: 1
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
订阅发布-订阅 Redis 时出现以下异常。
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at io.netty.handler.codec.http.DefaultFullHttpRequest.release(DefaultFullHttpRequest.java:103)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
at reactor.netty.http.server.HttpServerOperations.compression(HttpServerOperations.java:509)
at reactor.netty.http.server.HttpServerOperations.afterMarkSentHeaders(HttpServerOperations.java:580)
at reactor.netty.http.HttpOperations.lambda$then(HttpOperations.java:199)
at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:115)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:208)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:192)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:465)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:341)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:267)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:83)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:358)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:100)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:886)
at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:291)
at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:773)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
at io.lettuce.core.cluster.ClusterCommand.complete(ClusterCommand.java:65)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:746)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:681)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
我使用的代码
reactiveMsgListenerContainer
.receive(weatherRedisTopic)
.onBackpressureBuffer(1000)
.map(ReactiveSubscription.Message<String, String>::getMessage)
.map { msg ->
objectMapper.deserialize<List<RealtimeweatherDto>>(msg)
}
.subscribe(
{ realtimeweather: List<RealtimeweatherDto> ->
val weatherData: Map<String, RealtimeweatherDto> =
realtimeweather.associateBy({ it.weatherer.symbol + ":" + it.weatherer.country }, { it })
sink.tryEmitNext(weatherData)
},
{ err ->
logger.error(err) { "Error occurred in ${weatherRedisTopic.topic} redis subscription channel" }
}
)
我的预感是我在 pub-sub 通道中获取的数据太大,因此出现此错误。还有其他建议吗?
我怎样才能增加代码大小。
我正在使用 Spring webflux 2.5.1。有spring.codec.max-in-memory-size
属性 是不是应该增加
P.S:
这似乎与我们使用的WebClient有关。我们最近增加了编解码器大小
private val bufferedMemoryLimitInMb = 10 * 1024 * 1024
val webClient = WebClient.builder()
.codecs { configurer -> configurer.defaultCodecs().maxInMemorySize(bufferedMemoryLimitInMb ) }.build()
迁移到 Spring 启动 2.5.8 后。这个问题已为我解决。
订阅发布-订阅 Redis 时出现以下异常。
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at io.netty.handler.codec.http.DefaultFullHttpRequest.release(DefaultFullHttpRequest.java:103)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
at reactor.netty.http.server.HttpServerOperations.compression(HttpServerOperations.java:509)
at reactor.netty.http.server.HttpServerOperations.afterMarkSentHeaders(HttpServerOperations.java:580)
at reactor.netty.http.HttpOperations.lambda$then(HttpOperations.java:199)
at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:115)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:208)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:192)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:465)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:341)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:267)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:83)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:358)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:100)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:886)
at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:291)
at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:773)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
at io.lettuce.core.cluster.ClusterCommand.complete(ClusterCommand.java:65)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:746)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:681)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
我使用的代码
reactiveMsgListenerContainer
.receive(weatherRedisTopic)
.onBackpressureBuffer(1000)
.map(ReactiveSubscription.Message<String, String>::getMessage)
.map { msg ->
objectMapper.deserialize<List<RealtimeweatherDto>>(msg)
}
.subscribe(
{ realtimeweather: List<RealtimeweatherDto> ->
val weatherData: Map<String, RealtimeweatherDto> =
realtimeweather.associateBy({ it.weatherer.symbol + ":" + it.weatherer.country }, { it })
sink.tryEmitNext(weatherData)
},
{ err ->
logger.error(err) { "Error occurred in ${weatherRedisTopic.topic} redis subscription channel" }
}
)
我的预感是我在 pub-sub 通道中获取的数据太大,因此出现此错误。还有其他建议吗?
我怎样才能增加代码大小。
我正在使用 Spring webflux 2.5.1。有spring.codec.max-in-memory-size
属性 是不是应该增加
P.S:
这似乎与我们使用的WebClient有关。我们最近增加了编解码器大小
private val bufferedMemoryLimitInMb = 10 * 1024 * 1024
val webClient = WebClient.builder()
.codecs { configurer -> configurer.defaultCodecs().maxInMemorySize(bufferedMemoryLimitInMb ) }.build()
迁移到 Spring 启动 2.5.8 后。这个问题已为我解决。