Flink Streaming:序列化字符串消息中的意外字符
Flink Streaming: Unexpected charaters in serialized String messages
我的流正在生成 Tuple2<String,String>
类型的记录
.toString()
输出(usr12345,{"_key":"usr12345","_temperature":46.6})
其中键为 usr12345
,值为 {"_key":"usr12345","_temperature":46.6}
流上的.print()
正确输出值:
(usr12345,{"_key":"usr12345","_temperature":46.6})
但是当我将流写入 Kafka 时,密钥变为 usr12345
(开头为白色 space),值变为 ({"_key":"usr12345","_temperature":46.6}
注意键开头的 space 和值开头的左括号。
很奇怪。为什么会发生这种情况?
这是序列化代码:
TypeInformation<String> resultType = TypeInformation.of(String.class);
KeyedSerializationSchema<Tuple2<String, String>> schema =
new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig());
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream,
"topic",
schema,
kafkaProducerProperties);
TypeInformationKeyValueSerializationSchema
使用 Flink 的自定义序列化器序列化数据,这意味着结果必须被解释为二进制数据。 Flink 的 String 序列化器写入 String 的长度,然后对所有字符进行编码。
我假设您使用普通字符串反序列化器反序列化 Kafka 主题。对于密钥,序列化长度被解释为空白字符。对于值,长度被解释为 '('
.
尝试使用将键和值序列化为纯字符串的不同序列化程序,或使用兼容的反序列化程序。
我的流正在生成 Tuple2<String,String>
.toString()
输出(usr12345,{"_key":"usr12345","_temperature":46.6})
其中键为 usr12345
,值为 {"_key":"usr12345","_temperature":46.6}
流上的.print()
正确输出值:
(usr12345,{"_key":"usr12345","_temperature":46.6})
但是当我将流写入 Kafka 时,密钥变为 usr12345
(开头为白色 space),值变为 ({"_key":"usr12345","_temperature":46.6}
注意键开头的 space 和值开头的左括号。
很奇怪。为什么会发生这种情况?
这是序列化代码:
TypeInformation<String> resultType = TypeInformation.of(String.class);
KeyedSerializationSchema<Tuple2<String, String>> schema =
new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig());
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream,
"topic",
schema,
kafkaProducerProperties);
TypeInformationKeyValueSerializationSchema
使用 Flink 的自定义序列化器序列化数据,这意味着结果必须被解释为二进制数据。 Flink 的 String 序列化器写入 String 的长度,然后对所有字符进行编码。
我假设您使用普通字符串反序列化器反序列化 Kafka 主题。对于密钥,序列化长度被解释为空白字符。对于值,长度被解释为 '('
.
尝试使用将键和值序列化为纯字符串的不同序列化程序,或使用兼容的反序列化程序。