如何比较两个数据集?

How to compare two datasets?

我是 运行 一个 spark 应用程序,它从几个配置单元表(IP 地址)中读取数据,并将数据集中的每个元素(IP 地址)与其他数据集中的所有其他元素(IP 地址)进行比较.最终结果将类似于:

+---------------+--------+---------------+---------------+---------+----------+--------+----------+
|     ip_address|dataset1|dataset2       |dataset3       |dataset4 |dataset5  |dataset6|      date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx|     1  |              1|              0|        0|         0|      0 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              0|              0|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              1|              0|        1|         0|      0 |2017-11-06|
---------------------------------------------------------------------------------------------------

为了进行比较,我将 hiveContext.sql("query") 语句产生的 dataframes 转换为 Fastutil 对象。像这样:

val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

然后,我使用 iterator 遍历每个集合并使用 FileWriter.

将行写入文件
val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
     val p = dfIterator.next().toString
     //logic
}

我是运行和--num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g的申请

该过程总共运行大约 18-19 小时,每天对大约 4-5 百万条记录进行一对一比较。

然而,当我检查 Application Master UI 时,我注意到在 dataframesfastutil collection objects 的初始转换完成后没有 activity 发生(这作业启动后只需几分钟)。我看到代码中使用的 countcollect 语句会生成新作业,直到转换完成。之后,比较为运行时,不再启动新作业。

任何帮助将不胜感激,谢谢!

行后:

val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

尤其是。上面一行的那部分:

df.map(r => r(0).toString).collect()

其中 collect 是需要注意的最主要的事情,没有在 dfBuffer 上执行任何 Spark 作业(这是一个常规的本地 JVM 数据结构)。

Does it mean that the distributed processing is not happening at all?

正确。 collect 将所有数据放在驱动程序运行的单个 JVM 上(这正是你不应该这样做的原因,除非......你知道你在做什么以及它可能导致什么问题)。

我认为以上回答了所有其他问题。


解决比较两个数据集(以 Spark 和分布式方式)的问题的一个可能解决方案是 join 一个具有参考数据集的数据集和 count 比较记录的数量没变。