使用 spark 和 scala 进行连接计数时获得性能的最佳方法

Best way to gain performance when doing a join count using spark and scala

我有一个验证摄取操作的要求,基本上,我在 HDFS 中有两个大文件,一个是 avro 格式(摄取文件),另一个是镶木地板格式(合并文件)。

Avro 文件具有此架构:

文件名、日期、计数、afield1、afield2、afield3、afield4、afield5、afield6、...afieldN

Parquet 文件具有此架构:

fileName,anotherField1,anotherField1,anotherField2,anotherField3,anotherField14,...,anotherFieldN

如果我尝试在 DataFrame 中加载这两个文件,然后尝试使用简单的 join-where,我本地机器上的作业需要超过 24 小时!这是不可接受的。

ingestedDF.join(consolidatedDF).where($"filename" === $"fileName").count()

¿实现此目标的最佳方法是什么?在进行连接位置计数之前从 DataFrame 中删除列? ¿计算每个数据帧的计数,然后加入并求和?

PD

我正在阅读有关 map-side-joint 技术的内容,但如果有一个小文件能够放入 RAM 中,这种技术似乎对我有用,但我不能保证,所以,我想知道这是社区实现此目标的首选方式。

http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/

我会通过将数据剥离到我感兴趣的字段 (filename) 来解决这个问题,制作一组唯一的文件名及其来源(原始数据集) . 此时,两个中间数据集都具有相同的模式,因此我们可以合并它们并进行计数。这应该比对完整数据使用 join 快几个数量级。

// prepare some random dataset
val data1 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.8).map(i => (s"file$i", i, "rubbish"))
val data2 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.7).map(i => (s"file$i", i, "crap"))

val df1 = sparkSession.createDataFrame(data1).toDF("filename", "index", "data")
val df2 = sparkSession.createDataFrame(data2).toDF("filename", "index", "data")

// select only the column we are interested in and tag it with the source.
// Lets make it distinct as we are only interested in the unique file count
val df1Filenames = df1.select("filename").withColumn("df", lit("df1")).distinct
val df2Filenames = df2.select("filename").withColumn("df", lit("df2")).distinct

// union both dataframes
val union = df1Filenames.union(df2Filenames).toDF("filename","source")

// let's count the occurrences of filename, by using a groupby operation
val occurrenceCount = union.groupBy("filename").count

// we're interested in the count of those files that appear in both datasets (with a count of 2)
occurrenceCount.filter($"count"===2).count