使用 spark 和 scala 进行连接计数时获得性能的最佳方法
Best way to gain performance when doing a join count using spark and scala
我有一个验证摄取操作的要求,基本上,我在 HDFS 中有两个大文件,一个是 avro 格式(摄取文件),另一个是镶木地板格式(合并文件)。
Avro 文件具有此架构:
文件名、日期、计数、afield1、afield2、afield3、afield4、afield5、afield6、...afieldN
Parquet 文件具有此架构:
fileName,anotherField1,anotherField1,anotherField2,anotherField3,anotherField14,...,anotherFieldN
如果我尝试在 DataFrame 中加载这两个文件,然后尝试使用简单的 join-where,我本地机器上的作业需要超过 24 小时!这是不可接受的。
ingestedDF.join(consolidatedDF).where($"filename" === $"fileName").count()
¿实现此目标的最佳方法是什么?在进行连接位置计数之前从 DataFrame 中删除列? ¿计算每个数据帧的计数,然后加入并求和?
PD
我正在阅读有关 map-side-joint 技术的内容,但如果有一个小文件能够放入 RAM 中,这种技术似乎对我有用,但我不能保证,所以,我想知道这是社区实现此目标的首选方式。
我会通过将数据剥离到我感兴趣的字段 (filename
) 来解决这个问题,制作一组唯一的文件名及其来源(原始数据集) .
此时,两个中间数据集都具有相同的模式,因此我们可以合并它们并进行计数。这应该比对完整数据使用 join
快几个数量级。
// prepare some random dataset
val data1 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.8).map(i => (s"file$i", i, "rubbish"))
val data2 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.7).map(i => (s"file$i", i, "crap"))
val df1 = sparkSession.createDataFrame(data1).toDF("filename", "index", "data")
val df2 = sparkSession.createDataFrame(data2).toDF("filename", "index", "data")
// select only the column we are interested in and tag it with the source.
// Lets make it distinct as we are only interested in the unique file count
val df1Filenames = df1.select("filename").withColumn("df", lit("df1")).distinct
val df2Filenames = df2.select("filename").withColumn("df", lit("df2")).distinct
// union both dataframes
val union = df1Filenames.union(df2Filenames).toDF("filename","source")
// let's count the occurrences of filename, by using a groupby operation
val occurrenceCount = union.groupBy("filename").count
// we're interested in the count of those files that appear in both datasets (with a count of 2)
occurrenceCount.filter($"count"===2).count
我有一个验证摄取操作的要求,基本上,我在 HDFS 中有两个大文件,一个是 avro 格式(摄取文件),另一个是镶木地板格式(合并文件)。
Avro 文件具有此架构:
文件名、日期、计数、afield1、afield2、afield3、afield4、afield5、afield6、...afieldN
Parquet 文件具有此架构:
fileName,anotherField1,anotherField1,anotherField2,anotherField3,anotherField14,...,anotherFieldN
如果我尝试在 DataFrame 中加载这两个文件,然后尝试使用简单的 join-where,我本地机器上的作业需要超过 24 小时!这是不可接受的。
ingestedDF.join(consolidatedDF).where($"filename" === $"fileName").count()
¿实现此目标的最佳方法是什么?在进行连接位置计数之前从 DataFrame 中删除列? ¿计算每个数据帧的计数,然后加入并求和?
PD
我正在阅读有关 map-side-joint 技术的内容,但如果有一个小文件能够放入 RAM 中,这种技术似乎对我有用,但我不能保证,所以,我想知道这是社区实现此目标的首选方式。
我会通过将数据剥离到我感兴趣的字段 (filename
) 来解决这个问题,制作一组唯一的文件名及其来源(原始数据集) .
此时,两个中间数据集都具有相同的模式,因此我们可以合并它们并进行计数。这应该比对完整数据使用 join
快几个数量级。
// prepare some random dataset
val data1 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.8).map(i => (s"file$i", i, "rubbish"))
val data2 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.7).map(i => (s"file$i", i, "crap"))
val df1 = sparkSession.createDataFrame(data1).toDF("filename", "index", "data")
val df2 = sparkSession.createDataFrame(data2).toDF("filename", "index", "data")
// select only the column we are interested in and tag it with the source.
// Lets make it distinct as we are only interested in the unique file count
val df1Filenames = df1.select("filename").withColumn("df", lit("df1")).distinct
val df2Filenames = df2.select("filename").withColumn("df", lit("df2")).distinct
// union both dataframes
val union = df1Filenames.union(df2Filenames).toDF("filename","source")
// let's count the occurrences of filename, by using a groupby operation
val occurrenceCount = union.groupBy("filename").count
// we're interested in the count of those files that appear in both datasets (with a count of 2)
occurrenceCount.filter($"count"===2).count