Apache Flink 中的 FileSink 不在输出文件夹中生成日志
FileSink in Apache Flink not generating logs in output folder
我正在使用 Apache Flink 从 kafka 主题读取数据并将其存储在服务器上的文件中。我正在使用 FileSink 存储文件,它会根据日期和时间创建目录结构,但不会创建任何日志文件。
当我 运行 程序创建目录结构如下,但日志文件没有存储在这里。
/flink/testlogs/2021-12-08--07
/flink/testlogs/2021-12-08--06
我希望日志文件每 15 分钟写入一个新的日志文件。
下面是代码。
DataStream <String> kafkaTopicData = env.addSource(new FlinkKafkaConsumer<String>("MyTopic",new SimpleStringSchema(),p));
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build();
DataStream <Tuple6 < String,String,String ,String, String ,Integer >> newStream=kafkaTopicData.map(new LogParser());
final FileSink<Tuple6<String, String, String, String, String, Integer>> sink = FileSink.forRowFormat(new Path("/flink/testlogs"),
new SimpleStringEncoder < Tuple6 < String,String,String ,String, String ,Integer >> ("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(config)
.build();
newStream.sinkTo(sink);
env.execute("DataReader");
LogParser returns Tuple6.
在流模式下使用时,Flink 的 FileSink
需要启用检查点。为此,您需要指定要存储检查点的位置,以及希望它们发生的时间间隔。
要在 flink-conf.yaml
中进行配置,您需要执行如下操作:
state.checkpoints.dir: s3://checkpoint-bucket
execution.checkpointing.interval: 10s
或者在您的应用程序代码中,您可以这样做:
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoint-bucket");
env.enableCheckpointing(10000L);
来自 docs 的另一个重要细节:
Given that Flink sinks and UDFs in general do not differentiate between normal job termination (e.g. finite input stream) and termination due to failure, upon normal termination of a job, the last in-progress files will not be transitioned to the “finished” state.
我正在使用 Apache Flink 从 kafka 主题读取数据并将其存储在服务器上的文件中。我正在使用 FileSink 存储文件,它会根据日期和时间创建目录结构,但不会创建任何日志文件。
当我 运行 程序创建目录结构如下,但日志文件没有存储在这里。
/flink/testlogs/2021-12-08--07
/flink/testlogs/2021-12-08--06
我希望日志文件每 15 分钟写入一个新的日志文件。 下面是代码。
DataStream <String> kafkaTopicData = env.addSource(new FlinkKafkaConsumer<String>("MyTopic",new SimpleStringSchema(),p));
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build();
DataStream <Tuple6 < String,String,String ,String, String ,Integer >> newStream=kafkaTopicData.map(new LogParser());
final FileSink<Tuple6<String, String, String, String, String, Integer>> sink = FileSink.forRowFormat(new Path("/flink/testlogs"),
new SimpleStringEncoder < Tuple6 < String,String,String ,String, String ,Integer >> ("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(config)
.build();
newStream.sinkTo(sink);
env.execute("DataReader");
LogParser returns Tuple6.
在流模式下使用时,Flink 的 FileSink
需要启用检查点。为此,您需要指定要存储检查点的位置,以及希望它们发生的时间间隔。
要在 flink-conf.yaml
中进行配置,您需要执行如下操作:
state.checkpoints.dir: s3://checkpoint-bucket
execution.checkpointing.interval: 10s
或者在您的应用程序代码中,您可以这样做:
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoint-bucket");
env.enableCheckpointing(10000L);
来自 docs 的另一个重要细节:
Given that Flink sinks and UDFs in general do not differentiate between normal job termination (e.g. finite input stream) and termination due to failure, upon normal termination of a job, the last in-progress files will not be transitioned to the “finished” state.