Flink 流应用程序产生许多进行中的空输出文件

Flink streaming app produces many inprogress empty output files

我有以下流式应用程序,它从 Kafka 主题读取 Protobuf 消息并将它们写入文件系统镶木地板接收器:

class ProtoDeserializer extends DeserializationSchema[User] {

  override def getProducedType: TypeInformation[User] = TypeInformation.of(classOf[User])

  override def deserialize(message: Array[Byte]): User =
    User.parseFrom(message.slice(6, message.length))

  override def isEndOfStream(nextElement: User): Boolean = false
}

object StreamingKafkaProtoToParquetLocalFs {

  private val brokers = "localhost:9092"
  private val topic = "test-topic-proto"
  private val consumerGroupId = "test-consumer-proto"
  private val targetPath = "file:///my/path"


  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1))
    env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
    env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])

    val source = KafkaSource.builder[User]
      .setBootstrapServers(brokers)
      .setTopics(topic)
      .setGroupId(consumerGroupId)
      .setValueOnlyDeserializer(new ProtoDeserializer)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .build

    val input: DataStream[User] = env.fromSource(source, WatermarkStrategy.noWatermarks[User], "KafKaTable")
    val sink: StreamingFileSink[User] = StreamingFileSink
      .forBulkFormat(new Path(s"$targetPath/data"), ParquetProtoWriters.forType(classOf[User]))
      .build()
    input.addSink(sink)
    env.execute()
  }
}

当我执行程序时,我看到写入目标路径的所有输出文件都是空的(0 大小)和 inprogress 尽管我启用了检查点。

重要的是要提到主题不是空的,当我将接收器更改为 print() 时,消息打印正确。

我错过了什么?为什么 print 和镶木地板水槽的行为不同?

您似乎使用的是最新版本的 Flink,因此请尝试进行以下更改:

val sink: FileSink[User] = FileSink
    .forBulkFormat(new Path(s"$targetPath/data"), 
        ParquetProtoWriters.forType(classOf[User]))
    .build()

input.sinkTo(sink)

StreamingFileSink 已过时,将被 FileSink 取代。

似乎显式添加 Apache Parquet Protobuf 的依赖项可以解决问题。

对于 Maven 用户,已将以下依赖项添加到 pom.xml

<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-protobuf</artifactId>
    <version>1.11.1</version>
</dependency>