使用 Flink 反序列化 Protobuf kafka 消息
Deserialize Protobuf kafka messages with Flink
我正在尝试使用 Apache Flink 从 Kafka 读取和打印 Protobuf 消息。
Flink消费者代码为:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])
val source = KafkaSource.builder[User]
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumerGroupId)
.setValueOnlyDeserializer(new ProtoDeserializer())
.setStartingOffsets(OffsetsInitializer.earliest)
.build
val stream = env.fromSource(source, WatermarkStrategy.noWatermarks[User], kafkaTableName)
stream.print()
env.execute()
}
解串器代码为:
class ProtoDeserializer extends DeserializationSchema[User] {
override def getProducedType: TypeInformation[User] = null
override def deserialize(message: Array[Byte]): User = User.parseFrom(message)
override def isEndOfStream(nextElement: User): Boolean = false
}
执行流媒体时出现以下错误:
Protocol message contained an invalid tag (zero).
值得一提的是,我设法使用 confluent protobuf consumer 成功读取和反序列化消息,因此消息似乎没有损坏。
confluent protobuf 序列化器不会生成可以被其他反序列化器直接反序列化的内容。 The format is described in confluent's documentation:它以一个神奇字节(始终为零)开头,后跟一个四字节模式 ID。接下来是 protobuf 有效负载,从字节 5 开始。
getProducedType
方法应该 return 合适 TypeInformation
,在本例中 TypeInformation.of(User.class)
。如果没有这个,您可能会 运行 在 运行 时遇到问题。
与 KafkaSource
一起使用的反序列化器不需要实现 isEndOfStream
,但它不会造成任何伤害。
我正在尝试使用 Apache Flink 从 Kafka 读取和打印 Protobuf 消息。
Flink消费者代码为:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])
val source = KafkaSource.builder[User]
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumerGroupId)
.setValueOnlyDeserializer(new ProtoDeserializer())
.setStartingOffsets(OffsetsInitializer.earliest)
.build
val stream = env.fromSource(source, WatermarkStrategy.noWatermarks[User], kafkaTableName)
stream.print()
env.execute()
}
解串器代码为:
class ProtoDeserializer extends DeserializationSchema[User] {
override def getProducedType: TypeInformation[User] = null
override def deserialize(message: Array[Byte]): User = User.parseFrom(message)
override def isEndOfStream(nextElement: User): Boolean = false
}
执行流媒体时出现以下错误:
Protocol message contained an invalid tag (zero).
值得一提的是,我设法使用 confluent protobuf consumer 成功读取和反序列化消息,因此消息似乎没有损坏。
confluent protobuf 序列化器不会生成可以被其他反序列化器直接反序列化的内容。 The format is described in confluent's documentation:它以一个神奇字节(始终为零)开头,后跟一个四字节模式 ID。接下来是 protobuf 有效负载,从字节 5 开始。
getProducedType
方法应该 return 合适 TypeInformation
,在本例中 TypeInformation.of(User.class)
。如果没有这个,您可能会 运行 在 运行 时遇到问题。
与 KafkaSource
一起使用的反序列化器不需要实现 isEndOfStream
,但它不会造成任何伤害。