使用 Flink 反序列化 Protobuf kafka 消息

Deserialize Protobuf kafka messages with Flink

我正在尝试使用 Apache Flink 从 Kafka 读取和打印 Protobuf 消息。

我按照官方文档没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/

Flink消费者代码为:

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.enableCheckpointing(5000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
    env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])

    val source = KafkaSource.builder[User]
      .setBootstrapServers(brokers)
      .setTopics(topic)
      .setGroupId(consumerGroupId)
      .setValueOnlyDeserializer(new ProtoDeserializer())
      .setStartingOffsets(OffsetsInitializer.earliest)
      .build

    val stream = env.fromSource(source, WatermarkStrategy.noWatermarks[User], kafkaTableName)
    stream.print()
    env.execute()
  }

解串器代码为:

class ProtoDeserializer extends DeserializationSchema[User] {

  override def getProducedType: TypeInformation[User] = null

  override def deserialize(message: Array[Byte]): User = User.parseFrom(message)

  override def isEndOfStream(nextElement: User): Boolean = false
}

执行流媒体时出现以下错误:

Protocol message contained an invalid tag (zero).

值得一提的是,我设法使用 confluent protobuf consumer 成功读取和反序列化消息,因此消息似乎没有损坏。

confluent protobuf 序列化器不会生成可以被其他反序列化器直接反序列化的内容。 The format is described in confluent's documentation:它以一个神奇字节(始终为零)开头,后跟一个四字节模式 ID。接下来是 protobuf 有效负载,从字节 5 开始。

getProducedType 方法应该 return 合适 TypeInformation,在本例中 TypeInformation.of(User.class)。如果没有这个,您可能会 运行 在 运行 时遇到问题。

KafkaSource 一起使用的反序列化器不需要实现 isEndOfStream,但它不会造成任何伤害。