Flink Checkpointing 模式 ExactlyOnce 未按预期工作

Flink Checkpointing mode ExactlyOnce is not working as expected

我是 flink 新手,如果我的理解有误,我深表歉意,我正在构建一个数据流应用程序,并且该流包含多个数据流,这些数据流检查传入的数据流中是否存在所需的字段。我的应用程序验证传入的数据,如果数据验证成功,如果数据已经存在,它应该将数据附加到给定的文件中。我正在尝试模拟一个 DataStream 中是否发生任何异常,其他数据流不应受到影响,因为我在其中一个流中明确抛出异常。在下面的示例中,为了简单起见,我使用 windows 文本文件来附加数据

注意:我的流程没有状态,因为我没有任何东西可以存储在状态中

public class ExceptionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);

       // env.setParallelism(1);

        //env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));

        // to set minimum progress time to happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // checkpoints have to complete within 5000 ms, or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(5000);

        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  
        
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // number of restart attempts
                Time.of(10, TimeUnit.SECONDS) // delay
        ));

        DataStream<String> input1 = env.fromElements("hello");
        
        DataStream<String> input2 = env.fromElements("hello");


        DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //out.collect(value.concat(" world"));
                throw new Exception("=====================NO VALUE TO CHECK=================");
            }
        });


        DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                out.collect(value.concat(" world"));
            }
        });

       output2.addSink(new SinkFunction<String>() {
           @Override
           public void invoke(String value) throws Exception {
               try {
                File myObj = new File("C://flinkOutput//filename.txt");
                if (myObj.createNewFile()) {
                    System.out.println("File created: " + myObj.getName());
                    BufferedWriter out = new BufferedWriter(
                            new FileWriter("C://flinkOutput//filename.txt", true));
                    out.write(value);
                    out.close();
                    System.out.println("Successfully wrote to the file.");
                } else {
                    System.out.println("File already exists.");
                    BufferedWriter out = new BufferedWriter(
                            new FileWriter("C://flinkOutput//filename.txt", true));
                    out.write(value);
                    out.close();
                    System.out.println("Successfully wrote to the file.");
                }
            } catch (IOException e) {
                System.out.println("An error occurred.");
                e.printStackTrace();
            }
           }
       });

        env.execute();

    }

我几乎没有以下疑问

  1. 当我在 output1 流中抛出异常时,第二个流 output2 是 运行 即使遇到异常并将数据写入本地文件但是当我检查文件时输出如下

     hello world
     hello world
     hello world
     hello world
    
  2. 根据我对 flink 文档的理解,如果我将检查点模式用作 EXACTLY_ONCE 它不应该将数据写入文件不超过一次,因为该过程已经完成并写入数据文件。但它没有发生在我的情况下,如果我做错了什么,我也不会明白

请帮助我消除我对检查点的疑虑,以及我如何实现 EXACTLY_ONCE 机制我在 flink 中读到 TWO_PHASE_COMMIT 但我没有得到任何关于如何实现它的例子。

根据@Mikalai Lushchytski 的建议,我在下面实现了 StreamingSinkFunction

带 StreamingSinkFunction

public class ExceptionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);

       // env.setParallelism(1);

        //env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));

        // to set minimum progress time to happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // checkpoints have to complete within 5000 ms, or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(5000);

        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  
        
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // number of restart attempts
                Time.of(10, TimeUnit.SECONDS) // delay
        ));

        DataStream<String> input1 = env.fromElements("hello");
        
        DataStream<String> input2 = env.fromElements("hello");


        DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //out.collect(value.concat(" world"));
                throw new Exception("=====================NO VALUE TO CHECK=================");
            }
        });
        
        
        DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                out.collect(value.concat(" world"));
            }
        });
        
        
        String outputPath = "C://flinkCheckpoint";

        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1)
                                .build())
                .build();
                
        
        output2.addSink(sink);

       
       });

        env.execute();

    }

但是当我检查检查点文件夹时,我可以看到它创建了四个部分文件,如下所示

我正在做的事情是因为它创建了多部分文件吗?

为了保证 end-to-end exactly-once 记录传递(除了 exactly-once 状态语义),数据接收器需要参与检查点机制(以及数据源)。

如果您要将数据写入文件,则可以使用 StreamingFileSink,它将其输入元素发送到存储桶中的 FileSystem 文件。这与检查点机制集成以提供恰好一次语义 out-of-the 框。

如果您要实现自己的接收器,则接收器功能必须实现 CheckpointedFunction 接口并正确实现 snapshotState(FunctionSnapshotContext context) 在请求检查点快照并刷新当前时调用的方法申请状态。此外,我建议实现 CheckpointListener 接口,以便在分布式检查点完成后得到通知。

Flink 已经提供了一个抽象TwoPhaseCommitSinkFunction,它是所有打算实现exactly-once 语义的SinkFunction 的推荐基础class。它通过在 CheckpointedFunctionCheckpointListener。例如,您可以查看 FlinkKafkaProducer.java 源代码。