在 flink 中解析 A​​vro 消息,如果字段在 Avro Schema 中可以为空,则给出空指针异常

Parsing Avro messages in flink , giving null pointer exception if field is nullable in Avro Schema

我需要解析存储在 Avro 中的融合 Kafka 的消息。但是在应用过滤器时它给出了空指针异常,没有过滤器我能够写回kafka但是在应用过滤器时它给出了空指针异常。

 public static void main(String[] args) throws Exception {

        Properties config = new Properties();
        config.setProperty("bootstrap.servers", "localhost:9092");
        config.setProperty("group.id","topic");
        String schemaRegistryUrl  = "http://localhost:8091";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStream<topic1> streamIn = env
                .addSource(
                        new FlinkKafkaConsumer<>(
                                "topic1",
                                ConfluentRegistryAvroDeserializationSchema.forSpecific(topic1.class, schemaRegistryUrl),
                                config
                        ).setStartFromEarliest());
                        
        //Question here : want to return only rows which need to qulify below as with below it is giving null pointer exception

        DataStreamSink fltrtsrm_so=streamIn.filter((new FilterFunction<topic1>() {
            public boolean filter(topic1 user) throws Exception {
                return user.get("col3").toString().equals("New");
            }
        })).print();        
        //Also let me know if there is any better way to do it as for me its just the start..)

这是架构:

{
  "namespace": "testclass",
  "type": "record",
  "name": "topic1",
  "fields": [
    {
      "name": "col1",
      "type": "string"
    },
    {
      "name": "col2",
      "type": "string"
    },
    {
      "default": null,
      "name": "col3",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "col4",
      "type": [
        "null",
        "string"
      ]
    }
    ]
}

当您在 col3 的主题中有 null 值并且您尝试访问它 .toString() 时,您有例外,这是 null.toString().

您应该测试 col3 是否不是 null