Apache Flink 中的 FileSink 不在输出文件夹中生成日志

FileSink in Apache Flink not generating logs in output folder

我正在使用 Apache Flink 从 kafka 主题读取数据并将其存储在服务器上的文件中。我正在使用 FileSink 存储文件,它会根据日期和时间创建目录结构,但不会创建任何日志文件。

当我 运行 程序创建目录结构如下,但日志文件没有存储在这里。

/flink/testlogs/2021-12-08--07 
/flink/testlogs/2021-12-08--06

我希望日志文件每 15 分钟写入一个新的日志文件。 下面是代码。

DataStream <String> kafkaTopicData = env.addSource(new FlinkKafkaConsumer<String>("MyTopic",new SimpleStringSchema(),p)); 

OutputFileConfig config = OutputFileConfig
                 .builder()
                 .withPartPrefix("prefix")
                 .withPartSuffix(".ext")
                 .build();

DataStream <Tuple6 < String,String,String ,String, String ,Integer >> newStream=kafkaTopicData.map(new LogParser());

final FileSink<Tuple6<String, String, String, String, String, Integer>> sink = FileSink.forRowFormat(new Path("/flink/testlogs"),
                  new SimpleStringEncoder < Tuple6 < String,String,String ,String, String ,Integer >> ("UTF-8"))
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                        .withMaxPartSize(1024 * 1024 * 1024)
                        .build())
                .withOutputFileConfig(config)
                .build();
    
        newStream.sinkTo(sink);

env.execute("DataReader");  

LogParser returns Tuple6. 

在流模式下使用时,Flink 的 FileSink 需要启用检查点。为此,您需要指定要存储检查点的位置,以及希望它们发生的时间间隔。

要在 flink-conf.yaml 中进行配置,您需要执行如下操作:

state.checkpoints.dir: s3://checkpoint-bucket
execution.checkpointing.interval: 10s

或者在您的应用程序代码中,您可以这样做:

env.getCheckpointConfig().setCheckpointStorage("s3://checkpoint-bucket");
env.enableCheckpointing(10000L);

来自 docs 的另一个重要细节:

Given that Flink sinks and UDFs in general do not differentiate between normal job termination (e.g. finite input stream) and termination due to failure, upon normal termination of a job, the last in-progress files will not be transitioned to the “finished” state.