使用来自 Kafka 的 JSON 个对象时序列化错误
Error in serialization when consuming JSON Objects from Kafka
我正在尝试向 Kafka 生成 Json 个对象并手动使用它们,我在 org.apache.kafka.streams.examples.pageview.
中使用 JSONPOJO Serdes
我的生产者代码是:
package JsonProducer;
imports ...
public class jsnPdc {
public static void main(String[] args) throws IOException {
byte[] arr= "XXXX THIS IS TEST DATA \n XYZ".getBytes();
JSONObject jsn = new JSONObject();
jsn.put("Header_Title", (Arrays.copyOfRange(arr, 0, 4)));
jsn.put("Data_Part", (Arrays.copyOfRange(arr, 4, arr.length)));
Properties props = new Properties();
props.put("bootstrap.servers", "xxxxxxxxxxxxxxxxxxxxx:xxxx");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.streams.examples.pageview.JsonPOJOSerializer");
KafkaProducer<String, JSONObject> pdc = new KafkaProducer<>(props);
pdc.send(new ProducerRecord<String,JSONObject>("testoutput", jsn));
System.in.read();
}
}
消费者代码为:
package testConsumer;
imports ...
public class consumer_0 {
static public void main(String[] argv) throws ParseException {
//Configuration
Properties props = new Properties();
props.put("bootstrap.servers", "xxxxxxxxxxxxxxxxxxx:xxxx");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.streams.examples.pageview.JsonPOJODeserializer");
//Create Consumer Object
KafkaConsumer<String, JSONObject> consumer = new KafkaConsumer<String, JSONObject>(props);
consumer.subscribe(Arrays.asList("testoutput"));
//Keep Polling Records
System.out.println("Polling new record...\n");
while (true) {
ConsumerRecords<String, JSONObject> records = consumer.poll(100);
//Print Each Record
for (ConsumerRecord<String, JSONObject> record : records){
JSONObject json = record.value();
//Some print code, print(json) ...
}
}
}
}
我遇到了这个问题:
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition testoutput-0 at offset 20491
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.IllegalArgumentException: Unrecognized Type: [null]
Caused by: java.lang.IllegalArgumentException: Unrecognized Type: [null]
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1170)
at com.fasterxml.jackson.databind.type.TypeFactory.constructType(TypeFactory.java:618)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
at org.apache.kafka.streams.examples.pageview.JsonPOJODeserializer.deserialize(JsonPOJODeserializer.java:49)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:882)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:788)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:480)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1061)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at testConsumer.consumer_0.main(consumer_0.java:43)
我需要 json 的值字段类型在字节数组中。知道为什么会这样吗?
您误解了序列化值是谁的责任。你告诉 Kafka 使用 org.apache.kafka.streams.examples.pageview.JsonPOJOSerializer
序列化你给它的值,它期望一个普通的 java 对象,比如
class Data {
private String headerTitle;
private String dataPart;
//... constructors, getters, setters
}
但是您实际上已经将 JSONObject
传递给了 ProducerRecord
(换句话说,在将数据提供给 Kafka 之前,您已经自己序列化了数据,然后 Kafka 尝试序列化它 再次).
您可以自己序列化您的 jsn
值,但使用 org.apache.kafka.common.serialization.StringDeserializer
作为您的 value.serializer
,或者您可以取消 org.apache.kafka.streams.examples.pageview.JsonPOJOSerializer
并定义一个 class 就像上面的 Data
一样,并将那个 class 的交互传递给 ProducerRecord
.
我正在尝试向 Kafka 生成 Json 个对象并手动使用它们,我在 org.apache.kafka.streams.examples.pageview.
中使用 JSONPOJO Serdes我的生产者代码是:
package JsonProducer;
imports ...
public class jsnPdc {
public static void main(String[] args) throws IOException {
byte[] arr= "XXXX THIS IS TEST DATA \n XYZ".getBytes();
JSONObject jsn = new JSONObject();
jsn.put("Header_Title", (Arrays.copyOfRange(arr, 0, 4)));
jsn.put("Data_Part", (Arrays.copyOfRange(arr, 4, arr.length)));
Properties props = new Properties();
props.put("bootstrap.servers", "xxxxxxxxxxxxxxxxxxxxx:xxxx");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.streams.examples.pageview.JsonPOJOSerializer");
KafkaProducer<String, JSONObject> pdc = new KafkaProducer<>(props);
pdc.send(new ProducerRecord<String,JSONObject>("testoutput", jsn));
System.in.read();
}
}
消费者代码为:
package testConsumer;
imports ...
public class consumer_0 {
static public void main(String[] argv) throws ParseException {
//Configuration
Properties props = new Properties();
props.put("bootstrap.servers", "xxxxxxxxxxxxxxxxxxx:xxxx");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.streams.examples.pageview.JsonPOJODeserializer");
//Create Consumer Object
KafkaConsumer<String, JSONObject> consumer = new KafkaConsumer<String, JSONObject>(props);
consumer.subscribe(Arrays.asList("testoutput"));
//Keep Polling Records
System.out.println("Polling new record...\n");
while (true) {
ConsumerRecords<String, JSONObject> records = consumer.poll(100);
//Print Each Record
for (ConsumerRecord<String, JSONObject> record : records){
JSONObject json = record.value();
//Some print code, print(json) ...
}
}
}
}
我遇到了这个问题:
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition testoutput-0 at offset 20491
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.IllegalArgumentException: Unrecognized Type: [null]
Caused by: java.lang.IllegalArgumentException: Unrecognized Type: [null]
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1170)
at com.fasterxml.jackson.databind.type.TypeFactory.constructType(TypeFactory.java:618)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
at org.apache.kafka.streams.examples.pageview.JsonPOJODeserializer.deserialize(JsonPOJODeserializer.java:49)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:882)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:788)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:480)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1061)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at testConsumer.consumer_0.main(consumer_0.java:43)
我需要 json 的值字段类型在字节数组中。知道为什么会这样吗?
您误解了序列化值是谁的责任。你告诉 Kafka 使用 org.apache.kafka.streams.examples.pageview.JsonPOJOSerializer
序列化你给它的值,它期望一个普通的 java 对象,比如
class Data {
private String headerTitle;
private String dataPart;
//... constructors, getters, setters
}
但是您实际上已经将 JSONObject
传递给了 ProducerRecord
(换句话说,在将数据提供给 Kafka 之前,您已经自己序列化了数据,然后 Kafka 尝试序列化它 再次).
您可以自己序列化您的 jsn
值,但使用 org.apache.kafka.common.serialization.StringDeserializer
作为您的 value.serializer
,或者您可以取消 org.apache.kafka.streams.examples.pageview.JsonPOJOSerializer
并定义一个 class 就像上面的 Data
一样,并将那个 class 的交互传递给 ProducerRecord
.