如何为 Apache Flink 中的每个输入生成输出文件
How to generate output files for each input in Apache Flink
我正在使用 Flink 处理我的流数据。
流媒体来自其他一些中间件,例如 Kafka、Pravega 等
说 Pravega 正在发送一些词流,hello world my name is...
。
我需要的是三步流程:
- 将每个单词映射到我的自定义 class 对象
MyJson
。
- 将对象
MyJson
映射到字符串。
- 将字符串写入文件:一个字符串写入一个文件。
例如,对于流hello world my name is
,我应该得到五个文件。
这是我的代码:
// init Pravega connector
PravegaDeserializationSchema<String> adapter = new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>());
FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
// map stream to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
.map(new MapFunction<String, MyJson>() {
@Override
public MyJson map(String s) throws Exception {
MyJson myJson = JSON.parseObject(s, MyJson.class);
return myJson;
}
});
// map MyJson to String
DataStream<String> valueInJson = jsonStream
.map(new MapFunction<MyJson, String>() {
@Override
public String map(MyJson myJson) throws Exception {
return myJson.toString();
}
});
// output
valueInJson.print();
此代码会将所有结果输出到 Flink 日志文件。
我的问题是如何将一个单词写入一个输出文件?
我认为最简单的方法是使用自定义接收器。
stream.addSink(new WordFileSink)
public static class WordFileSink implements SinkFunction<String> {
@Override
public void invoke(String value, Context context) {
// generate a unique name for the new file and open it
// write the word to the file
// close the file
}
}
请注意,此实现不一定会提供 exactly once 行为。您可能需要注意文件命名方案既是唯一的又是确定性的(而不是取决于处理时间),并为文件可能已经存在的情况做好准备。
我正在使用 Flink 处理我的流数据。
流媒体来自其他一些中间件,例如 Kafka、Pravega 等
说 Pravega 正在发送一些词流,hello world my name is...
。
我需要的是三步流程:
- 将每个单词映射到我的自定义 class 对象
MyJson
。 - 将对象
MyJson
映射到字符串。 - 将字符串写入文件:一个字符串写入一个文件。
例如,对于流hello world my name is
,我应该得到五个文件。
这是我的代码:
// init Pravega connector
PravegaDeserializationSchema<String> adapter = new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>());
FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
// map stream to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
.map(new MapFunction<String, MyJson>() {
@Override
public MyJson map(String s) throws Exception {
MyJson myJson = JSON.parseObject(s, MyJson.class);
return myJson;
}
});
// map MyJson to String
DataStream<String> valueInJson = jsonStream
.map(new MapFunction<MyJson, String>() {
@Override
public String map(MyJson myJson) throws Exception {
return myJson.toString();
}
});
// output
valueInJson.print();
此代码会将所有结果输出到 Flink 日志文件。
我的问题是如何将一个单词写入一个输出文件?
我认为最简单的方法是使用自定义接收器。
stream.addSink(new WordFileSink)
public static class WordFileSink implements SinkFunction<String> {
@Override
public void invoke(String value, Context context) {
// generate a unique name for the new file and open it
// write the word to the file
// close the file
}
}
请注意,此实现不一定会提供 exactly once 行为。您可能需要注意文件命名方案既是唯一的又是确定性的(而不是取决于处理时间),并为文件可能已经存在的情况做好准备。