使用 Amazon Kinesis Client Library 时出现超时问题导致记录丢失

Timeout issues when using Amazon Kinesis Client Library resulting in dropped records

当 运行针对 LocalStack 实例使用 KCL 消费者时,我遇到了以下问题:

[INFO ] 2021-07-15 17:30:34.019 [software.amazon.kinesis.coordinator.DiagnosticEventLogger][task-1] DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647)
[WARN ] 2021-07-15 17:30:34.190 [software.amazon.kinesis.lifecycle.ShardConsumerSubscriber][ShardRecordProcessor-0000] ShardConsumerSubscriber - shardId-000000000000: onError().  Cancelling subscription, and marking self as failed. KCL will recreate the subscription as necessary to continue processing. If you are seeing this warning frequently consider increasing the SDK timeouts by providing an OverrideConfiguration to the kinesis client. Alternatively youcan configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppressintermittent ReadTimeout warnings. Last successful request details -- request id - UNKNOWN, timestamp - 2021-07-15T17:30:20.005207Z
software.amazon.kinesis.retrieval.RetryableRetrievalException: ReadTimeout
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.errorOccurred(FanOutRecordsPublisher.java:343) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.access0(FanOutRecordsPublisher.java:68) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.executeExceptionOccurred(FanOutRecordsPublisher.java:802) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.exceptionOccurred(FanOutRecordsPublisher.java:778) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.lambda$subscribeToShard(DefaultKinesisAsyncClient.java:2682) ~[kinesis-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo[=10=](CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute[=10=](AsyncApiCallMetricCollectionStage.java:54) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute(AsyncApiCallTimeoutTrackingStage.java:67) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo[=10=](CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:85) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:144) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute(AsyncRetryableStage.java:125) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo[=10=](CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null[=10=](MakeAsyncHttpRequestStage.java:104) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onError(MakeAsyncHttpRequestStage.java:158) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter.lambda$notifyError(ResponseHandler.java:309) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:181) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access0(ResponseHandler.java:71) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter.notifyError(ResponseHandler.java:307) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter.onError(ResponseHandler.java:283) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.exceptionCaught(HandlerPublisher.java:473) ~[netty-nio-client-2.16.98.jar:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at software.amazon.awssdk.http.nio.netty.internal.http2.Http2StreamExceptionHandler.exceptionCaught(Http2StreamExceptionHandler.java:53) ~[netty-nio-client-2.16.98.jar:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at software.amazon.awssdk.http.nio.netty.internal.UnusedChannelExceptionHandler.exceptionCaught(UnusedChannelExceptionHandler.java:52) ~[netty-nio-client-2.16.98.jar:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.ReadTimeoutHandler.readTimedOut(ReadTimeoutHandler.java:98) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.ReadTimeoutHandler.channelIdle(ReadTimeoutHandler.java:90) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.IdleStateHandler$ReaderIdleTimeoutTask.run(IdleStateHandler.java:504) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.IdleStateHandler$AbstractIdleTask.run(IdleStateHandler.java:476) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: null
    at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:198) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:194) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143) ~[sdk-core-2.16.98.jar:?]
    ... 45 more
Caused by: io.netty.handler.timeout.ReadTimeoutException

KinesisAsyncClient 创建时使用:

KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder()          
.region(region);
        if (endpoint != null) {
            log.debug("AWS Endpoint for Kinesis Client is set to {}", endpoint);
            kinesisAsyncClientBuilder.endpointOverride(URI.create(endpoint));
        }
        return KinesisClientUtil.createKinesisAsyncClient(kinesisAsyncClientBuilder);

并传递到 Scheduler,即 运行 在单独的线程上。当 运行ning 集成测试时,我将记录一条一条地插入 LocalStack 运动流(插入之间有 15 秒的延迟),但实际上只处理了一些记录。有时处理 4/6 的记录,有时处理 2/6。

我已经尝试使用 ClientOverrideConfiguration 增加超时以及为 HttpClient 提供增加的超时但没有任何更改。以前有人遇到过这个问题吗?我在网上看到了一堆类似的问题,但他们都得到了一个官方回复,并且 none 回复中的解决方案有效。

consumer-service.log

原来这比我想象的要简单得多。我未能正确设置调度程序。

    return new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
                    .retrievalSpecificConfig(new PollingConfig(awsConfig.getStreamName(), kinesisAsyncClient))
    );

如果您 运行 遇到同样的错误,我确实确保您已将最后一行添加到调度程序中。当我收到此错误时,我没有此行。添加该行后,错误就消失了。

.retrievalSpecificConfig(new PollingConfig(awsConfig.getStreamName(), kinesisAsyncClient))