如何为 Apache Flink 创建自定义 POJO
How to create a custom POJO for Apache Flink
我正在使用 Flink 处理来自某些数据源的一些 JSON 格式的数据。
现在,我的过程非常简单:从 JSON 格式的数据中提取每个元素并将它们打印到日志文件中。
这是我的一段代码:
// create proper deserializer to deserializer the JSON-format data into ObjectNode
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
// create connector to receive data from Pravega
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.???.print();
说从Pravega过来的数据是这样的:{"name":"titi", "age":18}
正如我所说,现在我只需要提取 name
和 age
并打印它们。
那我该怎么做呢?
据我了解,我需要在???
处制作一些自定义代码。我可能需要创建一个包含 ObjectNode
的自定义 POJO class。但我不知道怎么办。我已经阅读了 Flink 的官方文档,也尝试 google 关于如何为 Flink 创建自定义 POJO,但我仍然无法弄清楚。
能举个例子吗?
你为什么不简单地使用一些更有意义的东西而不是 JavaSerializer
?也许来自 here。
然后您可以创建一个包含您要使用的字段的 POJO,并简单地将 JSON 数据反序列化为您的 POJO 而不是 ObjectNode
此外,如果有某些特定原因您需要 ObjectNode
进行反序列化,那么您可以简单地执行以下操作:
//I assume You have created the class named MyPojo
dataStream.map(new MapFunction<ObjectNode, MyPojo>() {
ObjectMapper mapper = new ObjectMapper();
@Override
public MyPojo map(final ObjectNode value) throws Exception {
mapper.readValue(value.asText(), MyPojo.class)
}
})
我正在使用 Flink 处理来自某些数据源的一些 JSON 格式的数据。
现在,我的过程非常简单:从 JSON 格式的数据中提取每个元素并将它们打印到日志文件中。
这是我的一段代码:
// create proper deserializer to deserializer the JSON-format data into ObjectNode
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
// create connector to receive data from Pravega
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.???.print();
说从Pravega过来的数据是这样的:{"name":"titi", "age":18}
正如我所说,现在我只需要提取 name
和 age
并打印它们。
那我该怎么做呢?
据我了解,我需要在???
处制作一些自定义代码。我可能需要创建一个包含 ObjectNode
的自定义 POJO class。但我不知道怎么办。我已经阅读了 Flink 的官方文档,也尝试 google 关于如何为 Flink 创建自定义 POJO,但我仍然无法弄清楚。
能举个例子吗?
你为什么不简单地使用一些更有意义的东西而不是 JavaSerializer
?也许来自 here。
然后您可以创建一个包含您要使用的字段的 POJO,并简单地将 JSON 数据反序列化为您的 POJO 而不是 ObjectNode
此外,如果有某些特定原因您需要 ObjectNode
进行反序列化,那么您可以简单地执行以下操作:
//I assume You have created the class named MyPojo
dataStream.map(new MapFunction<ObjectNode, MyPojo>() {
ObjectMapper mapper = new ObjectMapper();
@Override
public MyPojo map(final ObjectNode value) throws Exception {
mapper.readValue(value.asText(), MyPojo.class)
}
})