Apache Flink - "BuckeingSink" class 的用法

Apache Flink - usage of "BuckeingSink" class

我正在研究使用 BucketingSink 写入 hdfs 文件的 poc class。尽管数据正在写入 hdfs 文件,但文件在 hdfs 上以“.pending”存在。

下面是我正在使用的代码。有人可以帮我找出问题并帮我解决这个问题吗?

BucketingSink<String> HdfsSink = new BucketingSink<String>("hdfs://xxxx/xxxx/xxxx/Test/");
HdfsSink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
HdfsSink.setBatchSize(1024 * 1024 * 2); // this is 2 MB,
HdfsSink.setInactiveBucketCheckInterval(10000L);
HdfsSink.setInactiveBucketThreshold(10000L);

嗨,你可以用这个。

您好,未完成的存储桶具有 .pending 扩展名。一旦桶关闭(例如,对于时间桶,一旦时间结束),文件将被重命名。由于您使用的是 NonRollingBucketer,因此文件永远不会关闭。我建议您使用 DateTimeBuketer。

附带说明:我建议您稍微增加检查点间隔。 123 毫秒非常频繁,应用程序看起来不像是延迟非常关键。像 2000 毫秒这样的值可能更合适。

我发现文件保持为 .pending 的真正原因是...因为我没有启用检查点。一旦我启用了检查点...文件正在成功关闭而不是 .pending.

您可以通过设置 env.enableCheckpointing(<duration>)

来启用检查点

请查看url@https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html了解更多详情。