在 Spark 中禁用镶木地板元数据摘要
Disable parquet metadata summary in Spark
我有一个 spark 作业(适用于 1.4.1)接收一连串的 kafka 事件。
我想将它们作为 parquet on tachyon 不断保存。
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.window(Seconds(1), Seconds(1)).foreachRDD { (rdd, time) =>
if (rdd.count() > 0) {
val mil = time.floor(Duration(86400000)).milliseconds
hiveContext.read.json(rdd).toDF().write.mode(SaveMode.Append).parquet(s"tachyon://192.168.1.12:19998/persisted5$mil")
hiveContext.sql(s"CREATE TABLE IF NOT EXISTS persisted5$mil USING org.apache.spark.sql.parquet OPTIONS ( path 'tachyon://192.168.1.12:19998/persisted5$mil')")
}
}
但是我看到随着时间的推移,在每次镶木地板写入时,spark 每 1 秒就会穿过镶木地板部分,变得越来越慢
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-db03b24d-6f98-4b5d-bb40-530f35b82633.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-3a7857e2-0435-4ee0-ab2c-6d40224f8842.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-47ff2ac1-da00-4473-b3f7-52640014bc5b.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-61625436-7353-4b1e-bb8d-e8afad3a582e.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-e711aa9a-9bf5-41d5-8523-f5edafa69626.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-4e0cca38-cf75-4771-8965-20a30c863100.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-d1510ed4-2c99-43e2-b3d1-38d3d54e626d.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-022d1918-392d-433f-a7f4-074e46b4460f.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-cf71f5d2-ba0e-4729-9aa1-41dad5d1d08f.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-ce990b1e-82cc-4feb-a162-ac3ddc275609.gz.parquet, 65536)
我得出结论,这是由于汇总数据的更新,我相信spark不会使用它们。所以我想禁用它
parquet sources 表明我应该能够将 "parquet.enable.summary-metadata" 设置为 false。
现在,我试过这样设置,就在创建 hiveContext 之后
hiveContext.sparkContext.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false)
hiveContext.sparkContext.hadoopConfiguration.setInt("parquet.metadata.read.parallelism", 10)
但没有成功,我仍然得到显示并行度为 5(默认)的日志。
使用 parquet 在 spark 中禁用摘要数据的正确方法是什么?
将 "parquet.enable.summary-metadata" 设置为文本("false" 而不是 false)似乎对我们有用。
顺便说一句,Spark 确实使用了 _common_metadata 文件(我们手动复制该文件用于重复性工作)
Spark 2.0 不再默认保存元数据摘要,参见SPARK-15719。
如果您正在处理托管在 S3 中的数据,您可能 仍然会发现 parquet 本身尝试扫描所有对象的尾部以检查其模式会影响 parquet 性能。可以明确禁用
sparkConf.set("spark.sql.parquet.mergeSchema", "false")
我有一个 spark 作业(适用于 1.4.1)接收一连串的 kafka 事件。 我想将它们作为 parquet on tachyon 不断保存。
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.window(Seconds(1), Seconds(1)).foreachRDD { (rdd, time) =>
if (rdd.count() > 0) {
val mil = time.floor(Duration(86400000)).milliseconds
hiveContext.read.json(rdd).toDF().write.mode(SaveMode.Append).parquet(s"tachyon://192.168.1.12:19998/persisted5$mil")
hiveContext.sql(s"CREATE TABLE IF NOT EXISTS persisted5$mil USING org.apache.spark.sql.parquet OPTIONS ( path 'tachyon://192.168.1.12:19998/persisted5$mil')")
}
}
但是我看到随着时间的推移,在每次镶木地板写入时,spark 每 1 秒就会穿过镶木地板部分,变得越来越慢
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-db03b24d-6f98-4b5d-bb40-530f35b82633.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-3a7857e2-0435-4ee0-ab2c-6d40224f8842.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-47ff2ac1-da00-4473-b3f7-52640014bc5b.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-61625436-7353-4b1e-bb8d-e8afad3a582e.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-e711aa9a-9bf5-41d5-8523-f5edafa69626.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-4e0cca38-cf75-4771-8965-20a30c863100.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-d1510ed4-2c99-43e2-b3d1-38d3d54e626d.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-022d1918-392d-433f-a7f4-074e46b4460f.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-cf71f5d2-ba0e-4729-9aa1-41dad5d1d08f.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-ce990b1e-82cc-4feb-a162-ac3ddc275609.gz.parquet, 65536)
我得出结论,这是由于汇总数据的更新,我相信spark不会使用它们。所以我想禁用它
parquet sources 表明我应该能够将 "parquet.enable.summary-metadata" 设置为 false。
现在,我试过这样设置,就在创建 hiveContext 之后
hiveContext.sparkContext.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false)
hiveContext.sparkContext.hadoopConfiguration.setInt("parquet.metadata.read.parallelism", 10)
但没有成功,我仍然得到显示并行度为 5(默认)的日志。
使用 parquet 在 spark 中禁用摘要数据的正确方法是什么?
将 "parquet.enable.summary-metadata" 设置为文本("false" 而不是 false)似乎对我们有用。
顺便说一句,Spark 确实使用了 _common_metadata 文件(我们手动复制该文件用于重复性工作)
Spark 2.0 不再默认保存元数据摘要,参见SPARK-15719。
如果您正在处理托管在 S3 中的数据,您可能 仍然会发现 parquet 本身尝试扫描所有对象的尾部以检查其模式会影响 parquet 性能。可以明确禁用
sparkConf.set("spark.sql.parquet.mergeSchema", "false")