BATCH 执行模式下的 Apache Flink FileSink:正在进行的文件不会转换为完成状态

Apache Flink FileSink in BATCH execution mode: in-progress files are not transitioned to finished state

我们正在尝试做的事情:我们正在评估 Flink 以使用 DataStream API in BATCH mode.

执行批处理

重现问题的最小应用程序

public class FlinkS3ProcessingDemoApplication {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        DataStreamSource<String> source = env.readTextFile("file:///Users/user1/any-text-file.txt");

        source.sinkTo(FileSink.forRowFormat(new Path("file:///Users/user1/output/"), new SimpleStringEncoder<String>("UTF-8")).build());

        env.execute("Test Flink application");
    }
}

Flink 版本:1.12.2 或 1.13.0

预期结果/Users/user1/output/ 文件夹中的“最终”文件。

根据FileSink documentation:

Given that Flink sinks and UDFs in general do not differentiate between normal job termination (e.g. finite input stream) and termination due to failure, upon normal termination of a job, the last in-progress files will not be transitioned to the “finished” state.

BATCH 模式的具体说明:

Pending files are committed, i.e. transition to Finished state, after the whole input has been processed.

实际结果:

.
└── 2021-07-13--10
    ├── .part-707a8590-04cb-4c2d-97b2-5652697d9c76-0.inprogress.7e99df6f-703d-44b3-875a-283e12b31c8e
    ├── .part-a82bcabd-065d-4263-bee0-72f8673f3fd3-0.inprogress.65067b75-ef6c-4185-ae87-fe59de95c86a
    ├── .part-c7c36fd5-fb31-4d55-b783-5373ce69e216-0.inprogress.3e953235-09f1-487b-8229-2cdfa0e2daf4
    └── .part-e66b004a-271f-4aae-9604-e035b2c2cfe3-0.inprogress.add8b0d9-aa89-491e-9a9d-f07b73ab8256

以及以下例外情况:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult(MiniClusterJobClient.java:117)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc[=12=](AkkaInvocationHandler.java:237)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.concurrent.FutureUtils.onComplete(FutureUtils.java:1046)
    at akka.dispatch.OnComplete.internal(Future.scala:264)
    at akka.dispatch.OnComplete.internal(Future.scala:261)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo.applyOrElse(PipeToSupport.scala:22)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo.applyOrElse(PipeToSupport.scala:21)
    at scala.concurrent.Future$$anonfun$andThen.apply(Future.scala:436)
    at scala.concurrent.Future$$anonfun$andThen.apply(Future.scala:435)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    ... 4 more
Caused by: java.nio.channels.ClosedChannelException
    at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
    at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:253)
    at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:103)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:74)
    at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
    at org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:196)
    at org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:200)
    at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close[=12=](StreamOperatorWrapper.java:128)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:128)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:135)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:439)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:627)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)

我们想知道的:是否可以结合FileSinkStreamingFileSink.

以批处理模式使用Flink

提前致谢!

FLIP-27 to provide support for BATCH execution mode in the DataStream API. In order to get the FileSink to properly transition PENDING files to FINISHED when running in BATCH mode, you need to use a source that implements FLIP-27, such as the FileSource (instead of readTextFile): https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/file/src/FileSource.html 中修改的源接口。

如您所见,它看起来像这样:

DataStreamSource<String> source = 
  env.fromSource(
    FileSource.forRecordStreamFormat(
      new TextLineFormat(),
      new Path("file:///Users/user/file.txt")
    ).build(),
    WatermarkStrategy.noWatermarks(),
    "MySourceName"
  );

如果您需要一种批量格式,例如 parquet,那么您可以做一些更像这样的事情:

DataStreamSource<String> source = 
  env.fromSource(
    FileSource.forBulkFileFormat(
      new ParquetColumnarRowInputFormat(...),
      new Path("file:///Users/me/data.parquet")
    ).build(),
    WatermarkStrategy.noWatermarks(),
    "MySourceName"
  );

对于 parquet 还有 ParquetVectorizedInputFormat,还有 orc 等格式

我无法使用上述方法,因为我正在读取 AVRO 文件并且找不到 AVRO 行格式。 但是,如果您像这样将时间戳和水印分配给输入流,它可以与 env.readTextFile 一起使用:

public class FlinkS3ProcessingDemoApplication {
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);

    DataStream<String> source = env.readTextFile("file:///Users/user1/any-text-file.txt")
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy.<String>noWatermarks().withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                                @Override
                                public long extractTimestamp(String element, long recordTimestamp) {
                                    return -1;
                                }
                            }));

    source.sinkTo(FileSink.forRowFormat(new Path("file:///Users/artvolk/output/"), new SimpleStringEncoder<String>("UTF-8")).build());

    env.execute("Test Flink application");
}

}