Flink 如何使用 - FasterXML / jackson-dataformats-text - 将 CSV 转换为 POJO

Flink How to use - FasterXML / jackson-dataformats-text - To convert CSV TO POJO

我在 class 上收到了一个 CSV,我需要获取这些值来创建一个 POJO。我不必在目录中打开“file.csv”,逗号分隔的元素由 Flink 传递给 EventDeserializationSchema,而这个用于“Event Class”来处理每个事件。

举个例子:

IN: "'Adam','Smith',66,....'12:01:00.000'" - > OUT: pojo

为此,我使用: https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv

这是我的事件 Class,应该可以解决问题,实际上目前什么也没做。

import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;

public class Event implements Serializable {

    CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();
    
    CsvSchema schema = CsvSchema.emptySchema().withHeader();

    CsvSchema bootstrapSchema = CsvSchema.emptySchema().withHeader();
    ObjectMapper mapper = new CsvMapper();
    mapper.readerFor(Pojo.class).with(bootstrapSchema).readValue(??);
    
    return Pojo
}

这是我的 Pojo class:

public class Pojo {
    
        public String firstName;
        public String lastName;
        private int age;
        public String time;

        public Pojo(String firstName, String lastName, int age, String time) {
            this.firstName = firstName;
            this.lastName = lastName;
            this.age = age;
            this.time =time;
            
        }

}

任何帮助 class 到 return Pojo 的人都将不胜感激。

这是 JSON 的示例: https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java

ClickEvenClass https://github.com/apache/flink/blob/9dd04a25bd300a725486ff08560920f548f3b1d9/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaEvent.java#L27

要让它工作,您需要有一个默认构造函数和 getter/setters 字段。我不明白你要在 Event 中做什么以及为什么还有 Pojo,但假设你想将传入的字符串反序列化为 Event,这样的事情应该工作:

  1. Event 波乔 class:
public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public String time;

    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}
  1. 实施了 deserialize()
public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;

    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}