"not a Parquet file (too small)" 在 Spark 结构化流式传输期间来自 Presto 运行
"not a Parquet file (too small)" from Presto during Spark structured streaming run
我设置了一个从 Kafka 读取数据的管道,使用 Spark 结构化流对其进行处理,然后将镶木地板文件写入 HDFS。数据查询的下游客户端正在使用配置为 Hive tables.
读取数据的 Presto
Kafka --> Spark --> Parquet on HDFS --> Presto
总的来说这是可行的。当 Spark 作业是 运行 个批处理时发生查询时,就会出现问题。 Spark 作业在 HDFS 上创建一个零长度的 Parquet 文件。如果 Presto 在处理查询的过程中尝试打开此文件,则会抛出错误:
Query 20171116_170937_07282_489cc failed: Error opening Hive split hdfs://namenode:50071/hive/warehouse/table/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet (offset=0, length=0): hdfs://namenode:50071/hive/warehouse/table/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet is not a Parquet file (too small)
此时文件确实是零字节,所以错误完全正确,但这不是我想要的管道行为。我希望能够连续写入适当的 HDFS 文件夹,而不会干扰 Presto 查询。
作业的 Spark scala 代码如下所示:
val FilesOnDisk = 1
Spark
.initKafkaStream("fleet_profile_test")
.filter(_.name.contains(job.kafkaTag))
.flatMap(job.parser)
.coalesce(FilesOnDisk)
.writeStream
.trigger(ProcessingTime("1 hours"))
.outputMode("append")
.queryName(job.queryName)
.format("parquet")
.option("path", job.outputFilesPath)
.start()
作业在整点 :00 开始。该文件首先在 HDFS 上作为零长度文件显示在 :05。直到它在 :21 完全写入,就在作业完成之前,它才会被更新。这使得 table 在 25% 的时间内无法从 Presto 有效使用。
每个文件只有500kB多一点,所以我预计文件的物理写入不会花费很长时间。据我了解,Parquet 文件的元数据位于文件末尾,因此编写更大文件的人会遇到更多麻烦。
人们在解决此 Presto 错误时使用了哪些策略来集成 Spark 结构化流和 Presto?
您可以尝试说服 Presto(或 Presto 团队)忽略空文件,但这无济于事,因为写入文件的程序(此处为:Spark)最终会刷新部分数据,并且文件会显示为部分,非空且格式不正确,因此也会导致错误。
防止 Presto(或其他读取 table 数据的程序)看到部分文件的方法是在 不同的 位置汇编文件,然后自动 将文件移动到正确的位置。
我设置了一个从 Kafka 读取数据的管道,使用 Spark 结构化流对其进行处理,然后将镶木地板文件写入 HDFS。数据查询的下游客户端正在使用配置为 Hive tables.
读取数据的 PrestoKafka --> Spark --> Parquet on HDFS --> Presto
总的来说这是可行的。当 Spark 作业是 运行 个批处理时发生查询时,就会出现问题。 Spark 作业在 HDFS 上创建一个零长度的 Parquet 文件。如果 Presto 在处理查询的过程中尝试打开此文件,则会抛出错误:
Query 20171116_170937_07282_489cc failed: Error opening Hive split hdfs://namenode:50071/hive/warehouse/table/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet (offset=0, length=0): hdfs://namenode:50071/hive/warehouse/table/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet is not a Parquet file (too small)
此时文件确实是零字节,所以错误完全正确,但这不是我想要的管道行为。我希望能够连续写入适当的 HDFS 文件夹,而不会干扰 Presto 查询。
作业的 Spark scala 代码如下所示:
val FilesOnDisk = 1
Spark
.initKafkaStream("fleet_profile_test")
.filter(_.name.contains(job.kafkaTag))
.flatMap(job.parser)
.coalesce(FilesOnDisk)
.writeStream
.trigger(ProcessingTime("1 hours"))
.outputMode("append")
.queryName(job.queryName)
.format("parquet")
.option("path", job.outputFilesPath)
.start()
作业在整点 :00 开始。该文件首先在 HDFS 上作为零长度文件显示在 :05。直到它在 :21 完全写入,就在作业完成之前,它才会被更新。这使得 table 在 25% 的时间内无法从 Presto 有效使用。
每个文件只有500kB多一点,所以我预计文件的物理写入不会花费很长时间。据我了解,Parquet 文件的元数据位于文件末尾,因此编写更大文件的人会遇到更多麻烦。
人们在解决此 Presto 错误时使用了哪些策略来集成 Spark 结构化流和 Presto?
您可以尝试说服 Presto(或 Presto 团队)忽略空文件,但这无济于事,因为写入文件的程序(此处为:Spark)最终会刷新部分数据,并且文件会显示为部分,非空且格式不正确,因此也会导致错误。
防止 Presto(或其他读取 table 数据的程序)看到部分文件的方法是在 不同的 位置汇编文件,然后自动 将文件移动到正确的位置。