从 Kafka Streams 反序列化对象时出错
Error While Deserializing object from Kafka Streams
在写入主题时,在 kafka 流方面,我使用来自 kafka 的 Serdes.String()
序列化器序列化了 Window 键 [test_id@timestamp1/timestamp2]
。从另一个应用程序检索相同的密钥时,我在反序列化时遇到以下错误
com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token
at [Source: [B@37e7c0b2; line: 1, column: 1]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
at com.fasterxml.jackson.databind.DeserializationContext.reportMappingException(DeserializationContext.java:1234)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1122)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1075)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:60)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
at TestAlert.extract(TestAlert.java:483)
at TestAlert.extract(TestAlert.java:1)
at org.apache.ignite.stream.StreamAdapter.addMessage(StreamAdapter.java:181)
at org.apache.ignite.stream.kafka.KafkaStreamer.access0(KafkaStreamer.java:47)
at org.apache.ignite.stream.kafka.KafkaStreamer.run(KafkaStreamer.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
以下是我为序列化 Windowed Key 编写的代码。这里 testWinAlerts
是开窗后的聚合结果,其中 <Windowd<String>>
作为键
testWinAlerts.toStream((k,v)->k.toString()).filter((k,v)->{
return (v!=null);}).to(Serdes.String(),aggrMessageSerde,"Some-Topic");
下面是反序列化器的代码,用于再次将 bytes[] 转换为 String 格式的键。其中 msg.key()[特定于 Ignite] 在从主题消费后以字节格式提供密钥。
String windowKey = objectMapper.readValue(msg.key(), String.class);
在进一步测试中,我还尝试从 Window 字符串中删除 "@", "/", "[", "]"
个字符,然后再将其写入 kafka 主题,然后它起作用了。但在实际实现中,我不想增加从 String 中删除这些字符的额外开销。那么我怎样才能消除这个错误呢?
您正在使用 StringSerde
将输入序列化为 字符串 ,但随后您正尝试使用 Jackson 对其进行反序列化,这需要 JSON 字符串 作为输入。常规字符串可以是任何一系列字符。但是 JSON 字符串看起来像 "string"
——根据定义,它以 "
开始和结束。所以你不能用 Jackson 反序列化任何字符串,在它的序列化状态下,它必须以 "
开始和结束。为什么不使用 StringSerde
来反序列化密钥?
在写入主题时,在 kafka 流方面,我使用来自 kafka 的 Serdes.String()
序列化器序列化了 Window 键 [test_id@timestamp1/timestamp2]
。从另一个应用程序检索相同的密钥时,我在反序列化时遇到以下错误
com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token
at [Source: [B@37e7c0b2; line: 1, column: 1]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
at com.fasterxml.jackson.databind.DeserializationContext.reportMappingException(DeserializationContext.java:1234)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1122)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1075)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:60)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
at TestAlert.extract(TestAlert.java:483)
at TestAlert.extract(TestAlert.java:1)
at org.apache.ignite.stream.StreamAdapter.addMessage(StreamAdapter.java:181)
at org.apache.ignite.stream.kafka.KafkaStreamer.access0(KafkaStreamer.java:47)
at org.apache.ignite.stream.kafka.KafkaStreamer.run(KafkaStreamer.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
以下是我为序列化 Windowed Key 编写的代码。这里 testWinAlerts
是开窗后的聚合结果,其中 <Windowd<String>>
作为键
testWinAlerts.toStream((k,v)->k.toString()).filter((k,v)->{
return (v!=null);}).to(Serdes.String(),aggrMessageSerde,"Some-Topic");
下面是反序列化器的代码,用于再次将 bytes[] 转换为 String 格式的键。其中 msg.key()[特定于 Ignite] 在从主题消费后以字节格式提供密钥。
String windowKey = objectMapper.readValue(msg.key(), String.class);
在进一步测试中,我还尝试从 Window 字符串中删除 "@", "/", "[", "]"
个字符,然后再将其写入 kafka 主题,然后它起作用了。但在实际实现中,我不想增加从 String 中删除这些字符的额外开销。那么我怎样才能消除这个错误呢?
您正在使用 StringSerde
将输入序列化为 字符串 ,但随后您正尝试使用 Jackson 对其进行反序列化,这需要 JSON 字符串 作为输入。常规字符串可以是任何一系列字符。但是 JSON 字符串看起来像 "string"
——根据定义,它以 "
开始和结束。所以你不能用 Jackson 反序列化任何字符串,在它的序列化状态下,它必须以 "
开始和结束。为什么不使用 StringSerde
来反序列化密钥?