如何比较两个数据集?
How to compare two datasets?
我是 运行 一个 spark 应用程序,它从几个配置单元表(IP 地址)中读取数据,并将数据集中的每个元素(IP 地址)与其他数据集中的所有其他元素(IP 地址)进行比较.最终结果将类似于:
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| ip_address|dataset1|dataset2 |dataset3 |dataset4 |dataset5 |dataset6| date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx| 1 | 1| 0| 0| 0| 0 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 0| 0| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 1| 0| 1| 0| 0 |2017-11-06|
---------------------------------------------------------------------------------------------------
为了进行比较,我将 hiveContext.sql("query")
语句产生的 dataframes
转换为 Fastutil
对象。像这样:
val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
然后,我使用 iterator
遍历每个集合并使用 FileWriter
.
将行写入文件
val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
val p = dfIterator.next().toString
//logic
}
我是运行和--num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g
的申请
该过程总共运行大约 18-19 小时,每天对大约 4-5 百万条记录进行一对一比较。
然而,当我检查 Application Master UI 时,我注意到在 dataframes
到 fastutil collection objects
的初始转换完成后没有 activity 发生(这作业启动后只需几分钟)。我看到代码中使用的 count
和 collect
语句会生成新作业,直到转换完成。之后,比较为运行时,不再启动新作业。
这意味着什么?这是否意味着分布式处理是
根本没有发生?
我了解集合对象不被视为 RDD,可以
这就是这个的原因吗?
spark如何在不使用资源的情况下执行我的程序
分配?
任何帮助将不胜感激,谢谢!
行后:
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
尤其是。上面一行的那部分:
df.map(r => r(0).toString).collect()
其中 collect
是需要注意的最主要的事情,没有在 dfBuffer
上执行任何 Spark 作业(这是一个常规的本地 JVM 数据结构)。
Does it mean that the distributed processing is not happening at all?
正确。 collect
将所有数据放在驱动程序运行的单个 JVM 上(这正是你不应该这样做的原因,除非......你知道你在做什么以及它可能导致什么问题)。
我认为以上回答了所有其他问题。
解决比较两个数据集(以 Spark 和分布式方式)的问题的一个可能解决方案是 join
一个具有参考数据集的数据集和 count
比较记录的数量没变。
我是 运行 一个 spark 应用程序,它从几个配置单元表(IP 地址)中读取数据,并将数据集中的每个元素(IP 地址)与其他数据集中的所有其他元素(IP 地址)进行比较.最终结果将类似于:
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| ip_address|dataset1|dataset2 |dataset3 |dataset4 |dataset5 |dataset6| date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx| 1 | 1| 0| 0| 0| 0 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 0| 0| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 1| 0| 1| 0| 0 |2017-11-06|
---------------------------------------------------------------------------------------------------
为了进行比较,我将 hiveContext.sql("query")
语句产生的 dataframes
转换为 Fastutil
对象。像这样:
val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
然后,我使用 iterator
遍历每个集合并使用 FileWriter
.
val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
val p = dfIterator.next().toString
//logic
}
我是运行和--num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g
的申请
该过程总共运行大约 18-19 小时,每天对大约 4-5 百万条记录进行一对一比较。
然而,当我检查 Application Master UI 时,我注意到在 dataframes
到 fastutil collection objects
的初始转换完成后没有 activity 发生(这作业启动后只需几分钟)。我看到代码中使用的 count
和 collect
语句会生成新作业,直到转换完成。之后,比较为运行时,不再启动新作业。
这意味着什么?这是否意味着分布式处理是 根本没有发生?
我了解集合对象不被视为 RDD,可以
这就是这个的原因吗?spark如何在不使用资源的情况下执行我的程序 分配?
任何帮助将不胜感激,谢谢!
行后:
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
尤其是。上面一行的那部分:
df.map(r => r(0).toString).collect()
其中 collect
是需要注意的最主要的事情,没有在 dfBuffer
上执行任何 Spark 作业(这是一个常规的本地 JVM 数据结构)。
Does it mean that the distributed processing is not happening at all?
正确。 collect
将所有数据放在驱动程序运行的单个 JVM 上(这正是你不应该这样做的原因,除非......你知道你在做什么以及它可能导致什么问题)。
我认为以上回答了所有其他问题。
解决比较两个数据集(以 Spark 和分布式方式)的问题的一个可能解决方案是 join
一个具有参考数据集的数据集和 count
比较记录的数量没变。