Flink Streaming:序列化字符串消息中的意外字符

Flink Streaming: Unexpected charaters in serialized String messages

我的流正在生成 Tuple2<String,String>

类型的记录

.toString()输出(usr12345,{"_key":"usr12345","_temperature":46.6})

其中键为 usr12345,值为 {"_key":"usr12345","_temperature":46.6}

流上的.print()正确输出值:

(usr12345,{"_key":"usr12345","_temperature":46.6})

但是当我将流写入 Kafka 时,密钥变为 usr12345(开头为白色 space),值变为 ({"_key":"usr12345","_temperature":46.6}

注意键开头的 space 和值开头的左括号。

很奇怪。为什么会发生这种情况?

这是序列化代码:

TypeInformation<String> resultType = TypeInformation.of(String.class);

KeyedSerializationSchema<Tuple2<String, String>> schema =
      new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig());

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
      stream,   
      "topic",    
      schema,  
      kafkaProducerProperties);

TypeInformationKeyValueSerializationSchema 使用 Flink 的自定义序列化器序列化数据,这意味着结果必须被解释为二进制数据。 Flink 的 String 序列化器写入 String 的长度,然后对所有字符进行编码。

我假设您使用普通字符串反序列化器反序列化 Kafka 主题。对于密钥,序列化长度被解释为空白字符。对于值,长度被解释为 '('.

尝试使用将键和值序列化为纯字符串的不同序列化程序,或使用兼容的反序列化程序。