如何在 Spark/Scala 中找到 DataFrame 的 sparkVector 元素的 sum/avg?
How to find sum/avg of sparkVector element of a DataFrame in Spark/Scala?
我在 Graphframes 中有来自 ParallelPersonalizedPageRank 的 pageranks 结果,它是一个 DataFrame,每个元素都是 sparseVector,如下所示:
+---------------------------------------+
| pageranks |
+---------------------------------------+
|(1887,[0, 1, 2,...][0.1, 0.2, 0.3, ...]|
|(1887,[0, 1, 2,...][0.2, 0.3, 0.4, ...]|
|(1887,[0, 1, 2,...][0.3, 0.4, 0.5, ...]|
|(1887,[0, 1, 2,...][0.4, 0.5, 0.6, ...]|
|(1887,[0, 1, 2,...][0.5, 0.6, 0.7, ...]|
添加 sparseVector 的所有元素并生成总和或平均值的最佳方法是什么?我想我们可以使用 toArray 将每个 sparseVector 转换为 denseVector 并遍历每个数组以使用两个嵌套循环获得结果,并得到类似这样的东西:
+-----------+
|pageranks |
+-----------+
|avg1|
|avg2|
|avg3|
|avg4|
|avg5|
|... |
我确信应该有更好的方法,但我在 API 文档中找不到太多关于 sparseVector 操作的信息。谢谢!
我想我找到了一个解决方案,无需收集(具体化)结果并在 Scala 中执行嵌套循环。只是 post 以防对其他人有帮助。
// convert Dataset element from SparseVector to Array
val ranksNursingArray = ranksNursing.vertices
.orderBy("id")
.select("pageranks")
.map(r =>
r(0).asInstanceOf[org.apache.spark.ml.linalg.SparseVector].toArray)
// Find average value of pageranks and add a column to DataFrame
val ranksNursingAvg = ranksNursingArray
.map{case (value) => (value, value.sum/value.length)}
.toDF("pageranks", "pr-avg")
最终结果如下所示:
+--------------------+--------------------+
| pageranks| pr-avg|
+--------------------+--------------------+
|[1.52034575371428...|2.970332668789975E-5|
|[0.0, 0.0, 0.0, 0...|5.160299770346173E-6|
|[0.0, 0.0, 0.0, 0...|4.400537827779479E-6|
|[0.0, 0.0, 0.0, 0...|3.010621958524792...|
|[0.0, 0.0, 4.8987...|2.342424435412115E-5|
|[0.0, 0.0, 1.6895...|6.955151139681538E-6|
|[0.0, 0.0, 1.5669...| 5.47016001804886E-6|
|[0.0, 0.0, 0.0, 2...|2.303811469709906E-5|
|[0.0, 0.0, 0.0, 3...|1.985155979369427E-5|
|[0.0, 0.0, 0.0, 0...|1.411993797780601...|
+--------------------+--------------------+
我在 Graphframes 中有来自 ParallelPersonalizedPageRank 的 pageranks 结果,它是一个 DataFrame,每个元素都是 sparseVector,如下所示:
+---------------------------------------+
| pageranks |
+---------------------------------------+
|(1887,[0, 1, 2,...][0.1, 0.2, 0.3, ...]|
|(1887,[0, 1, 2,...][0.2, 0.3, 0.4, ...]|
|(1887,[0, 1, 2,...][0.3, 0.4, 0.5, ...]|
|(1887,[0, 1, 2,...][0.4, 0.5, 0.6, ...]|
|(1887,[0, 1, 2,...][0.5, 0.6, 0.7, ...]|
添加 sparseVector 的所有元素并生成总和或平均值的最佳方法是什么?我想我们可以使用 toArray 将每个 sparseVector 转换为 denseVector 并遍历每个数组以使用两个嵌套循环获得结果,并得到类似这样的东西:
+-----------+
|pageranks |
+-----------+
|avg1|
|avg2|
|avg3|
|avg4|
|avg5|
|... |
我确信应该有更好的方法,但我在 API 文档中找不到太多关于 sparseVector 操作的信息。谢谢!
我想我找到了一个解决方案,无需收集(具体化)结果并在 Scala 中执行嵌套循环。只是 post 以防对其他人有帮助。
// convert Dataset element from SparseVector to Array
val ranksNursingArray = ranksNursing.vertices
.orderBy("id")
.select("pageranks")
.map(r =>
r(0).asInstanceOf[org.apache.spark.ml.linalg.SparseVector].toArray)
// Find average value of pageranks and add a column to DataFrame
val ranksNursingAvg = ranksNursingArray
.map{case (value) => (value, value.sum/value.length)}
.toDF("pageranks", "pr-avg")
最终结果如下所示:
+--------------------+--------------------+
| pageranks| pr-avg|
+--------------------+--------------------+
|[1.52034575371428...|2.970332668789975E-5|
|[0.0, 0.0, 0.0, 0...|5.160299770346173E-6|
|[0.0, 0.0, 0.0, 0...|4.400537827779479E-6|
|[0.0, 0.0, 0.0, 0...|3.010621958524792...|
|[0.0, 0.0, 4.8987...|2.342424435412115E-5|
|[0.0, 0.0, 1.6895...|6.955151139681538E-6|
|[0.0, 0.0, 1.5669...| 5.47016001804886E-6|
|[0.0, 0.0, 0.0, 2...|2.303811469709906E-5|
|[0.0, 0.0, 0.0, 3...|1.985155979369427E-5|
|[0.0, 0.0, 0.0, 0...|1.411993797780601...|
+--------------------+--------------------+