YugabyteDB部署在2个数据中心,R2DBC驱动错误
YugabyteDB deployment in 2 datacenters, R2DBC driver error
[用户在 YugabyteDB Community Slack 上发布的问题]
我目前正在将 YugabyteDB 与反应式 Postgres 驱动程序一起使用 (io.r2dbc:r2dbc-postgresql)
,并且我遇到了一些像这样的间歇性问题,您可以在下面的堆栈跟踪中看到。
我被告知 Postgres 驱动程序可能无法正确处理 Yugabyte 负载平衡,这可能导致我遇到这个问题,然后实际的 Yugabyte 驱动程序可能会正确处理这种情况。
但是,我使用的是响应式代码,这意味着我需要一个 R2DBC 驱动程序,但我没有找到任何官方 R2DBC Yugabyte 驱动程序。
您认为更合适的驱动程序真的能解决这样的问题吗?
如果是这样,是否还有其他 R2DBC 驱动程序更适合我的目的?
如果没有,您对解决以下问题有什么建议吗?
23:37:07.239 [reactor-tcp-epoll-1] WARN i.r.p.client.ReactorNettyClient - Error: SEVERITY_LOCALIZED=ERROR, SEVERITY_NON_LOCALIZED=ERROR, CODE=40001, MESSAGE=Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }, FILE=pg_yb_utils.c, LINE=333, ROUTINE=HandleYBStatusAtErrorLevel
23:37:07.247 [reactor-kafka-sender-1609501721] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.jooq.exception.DataAccessException: SQL [update "core"."videos" set "status" = where "core"."videos"."media_key" = returning "core"."videos"."user_id", "core"."videos"."media_key", "core"."videos"."post_id", "core"."videos"."status"]; Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
Caused by: org.jooq.exception.DataAccessException: SQL [update "core"."videos" set "status" = where "core"."videos"."media_key" = returning "core"."videos"."user_id", "core"."videos"."media_key", "core"."videos"."post_id", "core"."videos"."status"]; Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
at org.jooq.impl.Tools.translate(Tools.java:2978)
at org.jooq.impl.Tools.translate(Tools.java:2962)
at org.jooq.impl.R2DBC$Forwarding.onError(R2DBC.java:236)
at reactor.core.publisher.StrictSubscriber.onError(StrictSubscriber.java:106)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:203)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:255)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:86)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:735)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:986)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:119)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlRollbackException: Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:72)
at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:111)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:169)
... 43 common frames omitted
非常感谢您!
异常堆栈跟踪与 Restart read errors
有关。目前,YugabyteDB 仅支持带有 SNAPSHOT 隔离级别的乐观锁定,这意味着每当并发访问发生冲突时,驱动程序将抛出重启读取错误,如下所示:
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlRollbackException: Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
您将需要处理回滚异常并重试该操作。 YugabyteDB 的路线图是支持带有悲观锁定的可重复读取以避免事务重试,在该功能可用之前,您需要在客户端重试事务。
[用户在 YugabyteDB Community Slack 上发布的问题]
我目前正在将 YugabyteDB 与反应式 Postgres 驱动程序一起使用 (io.r2dbc:r2dbc-postgresql)
,并且我遇到了一些像这样的间歇性问题,您可以在下面的堆栈跟踪中看到。
我被告知 Postgres 驱动程序可能无法正确处理 Yugabyte 负载平衡,这可能导致我遇到这个问题,然后实际的 Yugabyte 驱动程序可能会正确处理这种情况。
但是,我使用的是响应式代码,这意味着我需要一个 R2DBC 驱动程序,但我没有找到任何官方 R2DBC Yugabyte 驱动程序。
您认为更合适的驱动程序真的能解决这样的问题吗? 如果是这样,是否还有其他 R2DBC 驱动程序更适合我的目的? 如果没有,您对解决以下问题有什么建议吗?
23:37:07.239 [reactor-tcp-epoll-1] WARN i.r.p.client.ReactorNettyClient - Error: SEVERITY_LOCALIZED=ERROR, SEVERITY_NON_LOCALIZED=ERROR, CODE=40001, MESSAGE=Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }, FILE=pg_yb_utils.c, LINE=333, ROUTINE=HandleYBStatusAtErrorLevel
23:37:07.247 [reactor-kafka-sender-1609501721] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.jooq.exception.DataAccessException: SQL [update "core"."videos" set "status" = where "core"."videos"."media_key" = returning "core"."videos"."user_id", "core"."videos"."media_key", "core"."videos"."post_id", "core"."videos"."status"]; Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
Caused by: org.jooq.exception.DataAccessException: SQL [update "core"."videos" set "status" = where "core"."videos"."media_key" = returning "core"."videos"."user_id", "core"."videos"."media_key", "core"."videos"."post_id", "core"."videos"."status"]; Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
at org.jooq.impl.Tools.translate(Tools.java:2978)
at org.jooq.impl.Tools.translate(Tools.java:2962)
at org.jooq.impl.R2DBC$Forwarding.onError(R2DBC.java:236)
at reactor.core.publisher.StrictSubscriber.onError(StrictSubscriber.java:106)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:203)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:255)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:86)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:735)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:986)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:119)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlRollbackException: Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:72)
at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:111)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:169)
... 43 common frames omitted
非常感谢您!
异常堆栈跟踪与 Restart read errors
有关。目前,YugabyteDB 仅支持带有 SNAPSHOT 隔离级别的乐观锁定,这意味着每当并发访问发生冲突时,驱动程序将抛出重启读取错误,如下所示:
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlRollbackException: Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
您将需要处理回滚异常并重试该操作。 YugabyteDB 的路线图是支持带有悲观锁定的可重复读取以避免事务重试,在该功能可用之前,您需要在客户端重试事务。