"not a Parquet file (too small)" 在 Spark 结构化流式传输期间来自 Presto 运行

"not a Parquet file (too small)" from Presto during Spark structured streaming run

我设置了一个从 Kafka 读取数据的管道,使用 Spark 结构化流对其进行处理,然后将镶木地板文件写入 HDFS。数据查询的下游客户端正在使用配置为 Hive tables.

读取数据的 Presto

Kafka --> Spark --> Parquet on HDFS --> Presto

总的来说这是可行的。当 Spark 作业是 运行 个批处理时发生查询时,就会出现问题。 Spark 作业在 HDFS 上创建一个零长度的 Parquet 文件。如果 Presto 在处理查询的过程中尝试打开此文件,则会抛出错误:

Query 20171116_170937_07282_489cc failed: Error opening Hive split hdfs://namenode:50071/hive/warehouse/table/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet (offset=0, length=0): hdfs://namenode:50071/hive/warehouse/table/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet is not a Parquet file (too small)

此时文件确实是零字节,所以错误完全正确,但这不是我想要的管道行为。我希望能够连续写入适当的 HDFS 文件夹,而不会干扰 Presto 查询。

作业的 Spark scala 代码如下所示:

val FilesOnDisk = 1
Spark
  .initKafkaStream("fleet_profile_test")
  .filter(_.name.contains(job.kafkaTag))
  .flatMap(job.parser)
  .coalesce(FilesOnDisk)
  .writeStream
  .trigger(ProcessingTime("1 hours"))
  .outputMode("append")
  .queryName(job.queryName)
  .format("parquet")
  .option("path", job.outputFilesPath)
  .start()

作业在整点 :00 开始。该文件首先在 HDFS 上作为零长度文件显示在 :05。直到它在 :21 完全写入,就在作业完成之前,它才会被更新。这使得 table 在 25% 的时间内无法从 Presto 有效使用。

每个文件只有500kB多一点,所以我预计文件的物理写入不会花费很长时间。据我了解,Parquet 文件的元数据位于文件末尾,因此编写更大文件的人会遇到更多麻烦。

人们在解决此 Presto 错误时使用了哪些策略来集成 Spark 结构化流和 Presto?

您可以尝试说服 Presto(或 Presto 团队)忽略空文件,但这无济于事,因为写入文件的程序(此处为:Spark)最终会刷新部分数据,并且文件会显示为部分,非空且格式不正确,因此也会导致错误。

防止 Presto(或其他读取 table 数据的程序)看到部分文件的方法是在 不同的 位置汇编文件,然后自动 将文件移动到正确的位置。