在 flink 中解析 Avro 消息,如果字段在 Avro Schema 中可以为空,则给出空指针异常
Parsing Avro messages in flink , giving null pointer exception if field is nullable in Avro Schema
我需要解析存储在 Avro 中的融合 Kafka 的消息。但是在应用过滤器时它给出了空指针异常,没有过滤器我能够写回kafka但是在应用过滤器时它给出了空指针异常。
public static void main(String[] args) throws Exception {
Properties config = new Properties();
config.setProperty("bootstrap.servers", "localhost:9092");
config.setProperty("group.id","topic");
String schemaRegistryUrl = "http://localhost:8091";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<topic1> streamIn = env
.addSource(
new FlinkKafkaConsumer<>(
"topic1",
ConfluentRegistryAvroDeserializationSchema.forSpecific(topic1.class, schemaRegistryUrl),
config
).setStartFromEarliest());
//Question here : want to return only rows which need to qulify below as with below it is giving null pointer exception
DataStreamSink fltrtsrm_so=streamIn.filter((new FilterFunction<topic1>() {
public boolean filter(topic1 user) throws Exception {
return user.get("col3").toString().equals("New");
}
})).print();
//Also let me know if there is any better way to do it as for me its just the start..)
这是架构:
{
"namespace": "testclass",
"type": "record",
"name": "topic1",
"fields": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "string"
},
{
"default": null,
"name": "col3",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "col4",
"type": [
"null",
"string"
]
}
]
}
当您在 col3 的主题中有 null
值并且您尝试访问它 .toString()
时,您有例外,这是 null.toString()
.
您应该测试 col3
是否不是 null
我需要解析存储在 Avro 中的融合 Kafka 的消息。但是在应用过滤器时它给出了空指针异常,没有过滤器我能够写回kafka但是在应用过滤器时它给出了空指针异常。
public static void main(String[] args) throws Exception {
Properties config = new Properties();
config.setProperty("bootstrap.servers", "localhost:9092");
config.setProperty("group.id","topic");
String schemaRegistryUrl = "http://localhost:8091";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<topic1> streamIn = env
.addSource(
new FlinkKafkaConsumer<>(
"topic1",
ConfluentRegistryAvroDeserializationSchema.forSpecific(topic1.class, schemaRegistryUrl),
config
).setStartFromEarliest());
//Question here : want to return only rows which need to qulify below as with below it is giving null pointer exception
DataStreamSink fltrtsrm_so=streamIn.filter((new FilterFunction<topic1>() {
public boolean filter(topic1 user) throws Exception {
return user.get("col3").toString().equals("New");
}
})).print();
//Also let me know if there is any better way to do it as for me its just the start..)
这是架构:
{
"namespace": "testclass",
"type": "record",
"name": "topic1",
"fields": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "string"
},
{
"default": null,
"name": "col3",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "col4",
"type": [
"null",
"string"
]
}
]
}
当您在 col3 的主题中有 null
值并且您尝试访问它 .toString()
时,您有例外,这是 null.toString()
.
您应该测试 col3
是否不是 null