使用 Amazon Kinesis Client Library 时出现超时问题导致记录丢失
Timeout issues when using Amazon Kinesis Client Library resulting in dropped records
当 运行针对 LocalStack 实例使用 KCL 消费者时,我遇到了以下问题:
[INFO ] 2021-07-15 17:30:34.019 [software.amazon.kinesis.coordinator.DiagnosticEventLogger][task-1] DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647)
[WARN ] 2021-07-15 17:30:34.190 [software.amazon.kinesis.lifecycle.ShardConsumerSubscriber][ShardRecordProcessor-0000] ShardConsumerSubscriber - shardId-000000000000: onError(). Cancelling subscription, and marking self as failed. KCL will recreate the subscription as necessary to continue processing. If you are seeing this warning frequently consider increasing the SDK timeouts by providing an OverrideConfiguration to the kinesis client. Alternatively youcan configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppressintermittent ReadTimeout warnings. Last successful request details -- request id - UNKNOWN, timestamp - 2021-07-15T17:30:20.005207Z
software.amazon.kinesis.retrieval.RetryableRetrievalException: ReadTimeout
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.errorOccurred(FanOutRecordsPublisher.java:343) ~[amazon-kinesis-client-2.3.6.jar:?]
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.access0(FanOutRecordsPublisher.java:68) ~[amazon-kinesis-client-2.3.6.jar:?]
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.executeExceptionOccurred(FanOutRecordsPublisher.java:802) ~[amazon-kinesis-client-2.3.6.jar:?]
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.exceptionOccurred(FanOutRecordsPublisher.java:778) ~[amazon-kinesis-client-2.3.6.jar:?]
at software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.lambda$subscribeToShard(DefaultKinesisAsyncClient.java:2682) ~[kinesis-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo[=10=](CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute[=10=](AsyncApiCallMetricCollectionStage.java:54) ~[sdk-core-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute(AsyncApiCallTimeoutTrackingStage.java:67) ~[sdk-core-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo[=10=](CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:85) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:144) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute(AsyncRetryableStage.java:125) ~[sdk-core-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo[=10=](CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null[=10=](MakeAsyncHttpRequestStage.java:104) ~[sdk-core-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onError(MakeAsyncHttpRequestStage.java:158) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter.lambda$notifyError(ResponseHandler.java:309) ~[netty-nio-client-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:181) ~[netty-nio-client-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access0(ResponseHandler.java:71) ~[netty-nio-client-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter.notifyError(ResponseHandler.java:307) ~[netty-nio-client-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter.onError(ResponseHandler.java:283) ~[netty-nio-client-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.exceptionCaught(HandlerPublisher.java:473) ~[netty-nio-client-2.16.98.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at software.amazon.awssdk.http.nio.netty.internal.http2.Http2StreamExceptionHandler.exceptionCaught(Http2StreamExceptionHandler.java:53) ~[netty-nio-client-2.16.98.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at software.amazon.awssdk.http.nio.netty.internal.UnusedChannelExceptionHandler.exceptionCaught(UnusedChannelExceptionHandler.java:52) ~[netty-nio-client-2.16.98.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.timeout.ReadTimeoutHandler.readTimedOut(ReadTimeoutHandler.java:98) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.timeout.ReadTimeoutHandler.channelIdle(ReadTimeoutHandler.java:90) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.timeout.IdleStateHandler$ReaderIdleTimeoutTask.run(IdleStateHandler.java:504) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.timeout.IdleStateHandler$AbstractIdleTask.run(IdleStateHandler.java:476) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: null
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:198) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:194) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143) ~[sdk-core-2.16.98.jar:?]
... 45 more
Caused by: io.netty.handler.timeout.ReadTimeoutException
KinesisAsyncClient 创建时使用:
KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder()
.region(region);
if (endpoint != null) {
log.debug("AWS Endpoint for Kinesis Client is set to {}", endpoint);
kinesisAsyncClientBuilder.endpointOverride(URI.create(endpoint));
}
return KinesisClientUtil.createKinesisAsyncClient(kinesisAsyncClientBuilder);
并传递到 Scheduler
,即 运行 在单独的线程上。当 运行ning 集成测试时,我将记录一条一条地插入 LocalStack 运动流(插入之间有 15 秒的延迟),但实际上只处理了一些记录。有时处理 4/6 的记录,有时处理 2/6。
我已经尝试使用 ClientOverrideConfiguration 增加超时以及为 HttpClient 提供增加的超时但没有任何更改。以前有人遇到过这个问题吗?我在网上看到了一堆类似的问题,但他们都得到了一个官方回复,并且 none 回复中的解决方案有效。
原来这比我想象的要简单得多。我未能正确设置调度程序。
return new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(awsConfig.getStreamName(), kinesisAsyncClient))
);
如果您 运行 遇到同样的错误,我确实确保您已将最后一行添加到调度程序中。当我收到此错误时,我没有此行。添加该行后,错误就消失了。
.retrievalSpecificConfig(new PollingConfig(awsConfig.getStreamName(), kinesisAsyncClient))
当 运行针对 LocalStack 实例使用 KCL 消费者时,我遇到了以下问题:
[INFO ] 2021-07-15 17:30:34.019 [software.amazon.kinesis.coordinator.DiagnosticEventLogger][task-1] DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647)
[WARN ] 2021-07-15 17:30:34.190 [software.amazon.kinesis.lifecycle.ShardConsumerSubscriber][ShardRecordProcessor-0000] ShardConsumerSubscriber - shardId-000000000000: onError(). Cancelling subscription, and marking self as failed. KCL will recreate the subscription as necessary to continue processing. If you are seeing this warning frequently consider increasing the SDK timeouts by providing an OverrideConfiguration to the kinesis client. Alternatively youcan configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppressintermittent ReadTimeout warnings. Last successful request details -- request id - UNKNOWN, timestamp - 2021-07-15T17:30:20.005207Z
software.amazon.kinesis.retrieval.RetryableRetrievalException: ReadTimeout
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.errorOccurred(FanOutRecordsPublisher.java:343) ~[amazon-kinesis-client-2.3.6.jar:?]
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.access0(FanOutRecordsPublisher.java:68) ~[amazon-kinesis-client-2.3.6.jar:?]
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.executeExceptionOccurred(FanOutRecordsPublisher.java:802) ~[amazon-kinesis-client-2.3.6.jar:?]
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.exceptionOccurred(FanOutRecordsPublisher.java:778) ~[amazon-kinesis-client-2.3.6.jar:?]
at software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.lambda$subscribeToShard(DefaultKinesisAsyncClient.java:2682) ~[kinesis-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo[=10=](CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute[=10=](AsyncApiCallMetricCollectionStage.java:54) ~[sdk-core-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute(AsyncApiCallTimeoutTrackingStage.java:67) ~[sdk-core-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo[=10=](CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:85) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:144) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute(AsyncRetryableStage.java:125) ~[sdk-core-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo[=10=](CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null[=10=](MakeAsyncHttpRequestStage.java:104) ~[sdk-core-2.16.98.jar:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onError(MakeAsyncHttpRequestStage.java:158) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter.lambda$notifyError(ResponseHandler.java:309) ~[netty-nio-client-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:181) ~[netty-nio-client-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access0(ResponseHandler.java:71) ~[netty-nio-client-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter.notifyError(ResponseHandler.java:307) ~[netty-nio-client-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter.onError(ResponseHandler.java:283) ~[netty-nio-client-2.16.98.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.exceptionCaught(HandlerPublisher.java:473) ~[netty-nio-client-2.16.98.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at software.amazon.awssdk.http.nio.netty.internal.http2.Http2StreamExceptionHandler.exceptionCaught(Http2StreamExceptionHandler.java:53) ~[netty-nio-client-2.16.98.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at software.amazon.awssdk.http.nio.netty.internal.UnusedChannelExceptionHandler.exceptionCaught(UnusedChannelExceptionHandler.java:52) ~[netty-nio-client-2.16.98.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.timeout.ReadTimeoutHandler.readTimedOut(ReadTimeoutHandler.java:98) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.timeout.ReadTimeoutHandler.channelIdle(ReadTimeoutHandler.java:90) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.timeout.IdleStateHandler$ReaderIdleTimeoutTask.run(IdleStateHandler.java:504) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
at io.netty.handler.timeout.IdleStateHandler$AbstractIdleTask.run(IdleStateHandler.java:476) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: null
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:198) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:194) ~[sdk-core-2.16.98.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143) ~[sdk-core-2.16.98.jar:?]
... 45 more
Caused by: io.netty.handler.timeout.ReadTimeoutException
KinesisAsyncClient 创建时使用:
KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder()
.region(region);
if (endpoint != null) {
log.debug("AWS Endpoint for Kinesis Client is set to {}", endpoint);
kinesisAsyncClientBuilder.endpointOverride(URI.create(endpoint));
}
return KinesisClientUtil.createKinesisAsyncClient(kinesisAsyncClientBuilder);
并传递到 Scheduler
,即 运行 在单独的线程上。当 运行ning 集成测试时,我将记录一条一条地插入 LocalStack 运动流(插入之间有 15 秒的延迟),但实际上只处理了一些记录。有时处理 4/6 的记录,有时处理 2/6。
我已经尝试使用 ClientOverrideConfiguration 增加超时以及为 HttpClient 提供增加的超时但没有任何更改。以前有人遇到过这个问题吗?我在网上看到了一堆类似的问题,但他们都得到了一个官方回复,并且 none 回复中的解决方案有效。
原来这比我想象的要简单得多。我未能正确设置调度程序。
return new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(awsConfig.getStreamName(), kinesisAsyncClient))
);
如果您 运行 遇到同样的错误,我确实确保您已将最后一行添加到调度程序中。当我收到此错误时,我没有此行。添加该行后,错误就消失了。
.retrievalSpecificConfig(new PollingConfig(awsConfig.getStreamName(), kinesisAsyncClient))