YugabyteDB部署在2个数据中心,R2DBC驱动错误

YugabyteDB deployment in 2 datacenters, R2DBC driver error

[用户在 YugabyteDB Community Slack 上发布的问题]

我目前正在将 YugabyteDB 与反应式 Postgres 驱动程序一起使用 (io.r2dbc:r2dbc-postgresql),并且我遇到了一些像这样的间歇性问题,您可以在下面的堆栈跟踪中看到。 我被告知 Postgres 驱动程序可能无法正确处理 Yugabyte 负载平衡,这可能导致我遇到这个问题,然后实际的 Yugabyte 驱动程序可能会正确处理这种情况。 但是,我使用的是响应式代码,这意味着我需要一个 R2DBC 驱动程序,但我没有找到任何官方 R2DBC Yugabyte 驱动程序。

您认为更合适的驱动程序真的能解决这样的问题吗? 如果是这样,是否还有其他 R2DBC 驱动程序更适合我的目的? 如果没有,您对解决以下问题有什么建议吗?

23:37:07.239 [reactor-tcp-epoll-1] WARN  i.r.p.client.ReactorNettyClient - Error: SEVERITY_LOCALIZED=ERROR, SEVERITY_NON_LOCALIZED=ERROR, CODE=40001, MESSAGE=Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }, FILE=pg_yb_utils.c, LINE=333, ROUTINE=HandleYBStatusAtErrorLevel
23:37:07.247 [reactor-kafka-sender-1609501721] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.jooq.exception.DataAccessException: SQL [update "core"."videos" set "status" =  where "core"."videos"."media_key" =  returning "core"."videos"."user_id", "core"."videos"."media_key", "core"."videos"."post_id", "core"."videos"."status"]; Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
Caused by: org.jooq.exception.DataAccessException: SQL [update "core"."videos" set "status" =  where "core"."videos"."media_key" =  returning "core"."videos"."user_id", "core"."videos"."media_key", "core"."videos"."post_id", "core"."videos"."status"]; Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
    at org.jooq.impl.Tools.translate(Tools.java:2978)
    at org.jooq.impl.Tools.translate(Tools.java:2962)
    at org.jooq.impl.R2DBC$Forwarding.onError(R2DBC.java:236)
    at reactor.core.publisher.StrictSubscriber.onError(StrictSubscriber.java:106)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:203)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:255)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
    at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:86)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154)
    at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:735)
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:986)
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860)
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767)
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:119)
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlRollbackException: Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }
    at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:72)
    at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:111)
    at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:169)
    ... 43 common frames omitted

非常感谢您!

异常堆栈跟踪与 Restart read errors 有关。目前,YugabyteDB 仅支持带有 SNAPSHOT 隔离级别的乐观锁定,这意味着每当并发访问发生冲突时,驱动程序将抛出重启读取错误,如下所示:

Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlRollbackException: Query error: Restart read required at: { read: { physical: 1639179428477618 } local_limit: { physical: 1639179428477618 } global_limit: <min> in_txn_limit: <max> serial_no: 0 }

您将需要处理回滚异常并重试该操作。 YugabyteDB 的路线图是支持带有悲观锁定的可重复读取以避免事务重试,在该功能可用之前,您需要在客户端重试事务。