临时文件 s3 随光束变慢(节流)
Temp files s3 slow down with beam (throtteling)
我正在尝试在 AWS KDA 上将 Apache Beam 与 Flink 结合使用。该管道从运动中读取一些数据,经过简单转换后,它尝试使用以下策略将结果分组到一个分片中:
.apply("Creating micro-batches", Window.<XXX>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(windowSizePanes),
AfterProcessingTime.pastFirstElementInPane().alignedTo(windowSizeSeconds))))
.withAllowedLateness(Duration.standardMinutes(60), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.discardingFiredPanes()
);
问题是,我遇到了很多 s3 减速异常,并且在发生错误的同一时刻只有 100 个临时文件:
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1319)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1295)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1434)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox(StreamTask.java:1423)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException{org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator}
... 14 more
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:131)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:995)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1432)
... 13 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1171)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1121)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
at org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1058)
at org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ToGroupByKeyResult.flatMap(FlinkStreamingTransformTranslators.java:1416)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ToGroupByKeyResult.flatMap(FlinkStreamingTransformTranslators.java:1388)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1171)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1121)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
at org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1058)
at org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:131)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:995)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1432)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox(StreamTask.java:1423)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 5HFF75416KXD5RMN; S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=; Proxy: null), S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:645)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
... 47 more
Caused by: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 5HFF75416KXD5RMN; S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=; Proxy: null), S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=
at org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.<init>(S3WritableByteChannel.java:100)
at org.apache.beam.sdk.io.aws.s3.S3FileSystem.create(S3FileSystem.java:416)
at org.apache.beam.sdk.io.aws.s3.S3FileSystem.create(S3FileSystem.java:90)
at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243)
at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:962)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:933)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 5HFF75416KXD5RMN; S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=; Proxy: null), S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1862)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1415)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1384)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1154)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:811)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:779)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:753)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:713)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:695)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:539)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5437)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5384)
at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3715)
at org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.<init>(S3WritableByteChannel.java:98)
at org.apache.beam.sdk.io.aws.s3.S3FileSystem.create(S3FileSystem.java:416)
at org.apache.beam.sdk.io.aws.s3.S3FileSystem.create(S3FileSystem.java:90)
at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243)
at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:962)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:933)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:645)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ToGroupByKeyResult.flatMap(FlinkStreamingTransformTranslators.java:1416)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ToGroupByKeyResult.flatMap(FlinkStreamingTransformTranslators.java:1388)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1171)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1121)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
at org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1058)
at org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:131)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:995)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1432)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox(StreamTask.java:1423)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
从错误信息来看,简单来说就是S3限流错误
当同一时间对同一存储桶的请求过多时,会出现此错误。 S3限流问题可以咨询S3客服
我正在尝试在 AWS KDA 上将 Apache Beam 与 Flink 结合使用。该管道从运动中读取一些数据,经过简单转换后,它尝试使用以下策略将结果分组到一个分片中:
.apply("Creating micro-batches", Window.<XXX>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(windowSizePanes),
AfterProcessingTime.pastFirstElementInPane().alignedTo(windowSizeSeconds))))
.withAllowedLateness(Duration.standardMinutes(60), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.discardingFiredPanes()
);
问题是,我遇到了很多 s3 减速异常,并且在发生错误的同一时刻只有 100 个临时文件:
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1319)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1295)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1434)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox(StreamTask.java:1423)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException{org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator}
... 14 more
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:131)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:995)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1432)
... 13 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1171)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1121)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
at org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1058)
at org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ToGroupByKeyResult.flatMap(FlinkStreamingTransformTranslators.java:1416)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ToGroupByKeyResult.flatMap(FlinkStreamingTransformTranslators.java:1388)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1171)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1121)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
at org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1058)
at org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:131)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:995)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1432)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox(StreamTask.java:1423)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 5HFF75416KXD5RMN; S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=; Proxy: null), S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:645)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
... 47 more
Caused by: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 5HFF75416KXD5RMN; S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=; Proxy: null), S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=
at org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.<init>(S3WritableByteChannel.java:100)
at org.apache.beam.sdk.io.aws.s3.S3FileSystem.create(S3FileSystem.java:416)
at org.apache.beam.sdk.io.aws.s3.S3FileSystem.create(S3FileSystem.java:90)
at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243)
at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:962)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:933)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 5HFF75416KXD5RMN; S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=; Proxy: null), S3 Extended Request ID: 2RZe+jaSpkhPEVt1VgPoZ/CsE51SwMQJA4BWVR4qyXB7TazD43NdsEj19AnKKleswZSyo964R0A=
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1862)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1415)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1384)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1154)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:811)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:779)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:753)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:713)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:695)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:539)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5437)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5384)
at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3715)
at org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.<init>(S3WritableByteChannel.java:98)
at org.apache.beam.sdk.io.aws.s3.S3FileSystem.create(S3FileSystem.java:416)
at org.apache.beam.sdk.io.aws.s3.S3FileSystem.create(S3FileSystem.java:90)
at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243)
at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:962)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:933)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:645)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ToGroupByKeyResult.flatMap(FlinkStreamingTransformTranslators.java:1416)
at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ToGroupByKeyResult.flatMap(FlinkStreamingTransformTranslators.java:1388)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1171)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1121)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
at org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1058)
at org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:131)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:995)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1432)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox(StreamTask.java:1423)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
从错误信息来看,简单来说就是S3限流错误
当同一时间对同一存储桶的请求过多时,会出现此错误。 S3限流问题可以咨询S3客服