如何为 Apache Flink 中的每个输入生成输出文件

How to generate output files for each input in Apache Flink

我正在使用 Flink 处理我的流数据。

流媒体来自其他一些中间件,例如 Kafka、Pravega 等

说 Pravega 正在发送一些词流,hello world my name is...

我需要的是三步流程:

  1. 将每个单词映射到我的自定义 class 对象 MyJson
  2. 将对象 MyJson 映射到字符串。
  3. 将字符串写入文件:一个字符串写入一个文件。

例如,对于流hello world my name is,我应该得到五个文件。

这是我的代码:

// init Pravega connector
PravegaDeserializationSchema<String> adapter = new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>());
        FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(adapter)
                .build();
// map stream to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
            .map(new MapFunction<String, MyJson>() {
                @Override
                public MyJson map(String s) throws Exception {
                    MyJson myJson = JSON.parseObject(s, MyJson.class);
                    return myJson;
                }
            });
// map MyJson to String
DataStream<String> valueInJson = jsonStream
            .map(new MapFunction<MyJson, String>() {
                @Override
                public String map(MyJson myJson) throws Exception {
                    return myJson.toString();
                }
            });
// output
valueInJson.print();

此代码会将所有结果输出到 Flink 日志文件。

我的问题是如何将一个单词写入一个输出文件?

我认为最简单的方法是使用自定义接收器。

stream.addSink(new WordFileSink)
public static class WordFileSink implements SinkFunction<String> {

    @Override
    public void invoke(String value, Context context) {
        // generate a unique name for the new file and open it
        // write the word to the file
        // close the file
    }
}

请注意,此实现不一定会提供 exactly once 行为。您可能需要注意文件命名方案既是唯一的又是确定性的(而不是取决于处理时间),并为文件可能已经存在的情况做好准备。