如何为 Apache Flink 创建自定义 POJO

How to create a custom POJO for Apache Flink

我正在使用 Flink 处理来自某些数据源的一些 JSON 格式的数据。

现在,我的过程非常简单:从 JSON 格式的数据中提取每个元素并将它们打印到日志文件中。

这是我的一段代码:

// create proper deserializer to deserializer the JSON-format data into ObjectNode
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
// create connector to receive data from Pravega
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(adapter)
    .build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.???.print();

说从Pravega过来的数据是这样的:{"name":"titi", "age":18}

正如我所说,现在我只需要提取 nameage 并打印它们。

那我该怎么做呢?

据我了解,我需要在???处制作一些自定义代码。我可能需要创建一个包含 ObjectNode 的自定义 POJO class。但我不知道怎么办。我已经阅读了 Flink 的官方文档,也尝试 google 关于如何为 Flink 创建自定义 POJO,但我仍然无法弄清楚。

能举个例子吗?

你为什么不简单地使用一些更有意义的东西而不是 JavaSerializer?也许来自 here

然后您可以创建一个包含您要使用的字段的 POJO,并简单地将 JSON 数据反序列化为您的 POJO 而不是 ObjectNode

此外,如果有某些特定原因您需要 ObjectNode 进行反序列化,那么您可以简单地执行以下操作:

//I assume You have created the class named MyPojo
dataStream.map(new MapFunction<ObjectNode, MyPojo>() {
            ObjectMapper mapper = new ObjectMapper();

            @Override
            public MyPojo map(final ObjectNode value) throws Exception {
                mapper.readValue(value.asText(), MyPojo.class)
            }
})