Spark Scala 中的点积
Dot product in Spark Scala
我在 Spark Scala 中有两个数据框,每个数据框的第二列是一个数字数组
val data22= Seq((1,List(0.693147,0.6931471)),(2,List(0.69314, 0.0)),(3,List(0.0, 0.693147))).toDF("ID","tf_idf")
data22.show(truncate=false)
+---+---------------------+
|ID |tf_idf |
+---+---------------------+
|1 |[0.693, 0.702] |
|2 |[0.69314, 0.0] |
|3 |[0.0, 0.693147] |
+---+---------------------+
val data12= Seq((1,List(0.69314,0.6931471))).toDF("ID","tf_idf")
data12.show(truncate=false)
+---+--------------------+
|ID |tf_idf |
+---+--------------------+
|1 |[0.693, 0.805] |
+---+--------------------+
我需要在这两个数据框中的行之间执行点积。也就是说,我需要将 data12
中的 tf_idf
数组与 data22
中 tf_idf
的每一行相乘。
(例如:点积的第一行应该是这样的:0.693*0.693 + 0.702*0.805
第二行:0.69314*0.693 + 0.0*0.805
第三行:0.0*0.693 + 0.693147*0.805)
基本上我想要一些东西(比如矩阵乘法)data22
*transpose(data12)
如果有人可以建议在 Spark Scala 中执行此操作的方法,我将不胜感激。
谢谢
解决方法如下图:
scala> val data22= Seq((1,List(0.693147,0.6931471)),(2,List(0.69314, 0.0)),(3,List(0.0, 0.693147))).toDF("ID","tf_idf")
data22: org.apache.spark.sql.DataFrame = [ID: int, tf_idf: array<double>]
scala> val data12= Seq((1,List(0.69314,0.6931471))).toDF("ID","tf_idf")
data12: org.apache.spark.sql.DataFrame = [ID: int, tf_idf: array<double>]
scala> val arrayDot = data12.take(1).map(row => (row.getAs[Int](0), row.getAs[WrappedArray[Double]](1).toSeq))
arrayDot: Array[(Int, Seq[Double])] = Array((1,WrappedArray(0.69314, 0.6931471)))
scala> val dotColumn = arrayDot(0)._2
dotColumn: Seq[Double] = WrappedArray(0.69314, 0.6931471)
scala> val dotUdf = udf((y: Seq[Double]) => y zip dotColumn map(z => z._1*z._2) reduce(_ + _))
dotUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(ArrayType(DoubleType,false))))
scala> data22.withColumn("dotProduct", dotUdf('tf_idf)).show
+---+--------------------+-------------------+
| ID| tf_idf| dotProduct|
+---+--------------------+-------------------+
| 1|[0.693147, 0.6931...| 0.96090081381841|
| 2| [0.69314, 0.0]|0.48044305959999994|
| 3| [0.0, 0.693147]| 0.4804528329237|
+---+--------------------+-------------------+
请注意,它将 data12
中的 tf_idf
数组与 data22
中的 tf_idf
的每一行相乘。
如果有帮助请告诉我!!
Spark Version 2.4+: 使用数组的几个函数,例如 zip_with
和 aggregate
,让您的代码更简单。为了按照您的详细描述,我已将 join
更改为 crossJoin
。
val data22= Seq((1,List(0.693147,0.6931471)),(2,List(0.69314, 0.0)),(3,List(0.0, 0.693147))).toDF("ID","tf_idf")
val data12= Seq((1,List(0.693,0.805))).toDF("ID2","tf_idf2")
val df = data22.crossJoin(data12).drop("ID2")
df.withColumn("DotProduct", expr("aggregate(zip_with(tf_idf, tf_idf2, (x, y) -> x * y), 0D, (sum, x) -> sum + x)")).show(false)
这是结果。
+---+---------------------+--------------+-------------------+
|ID |tf_idf |tf_idf2 |DotProduct |
+---+---------------------+--------------+-------------------+
|1 |[0.693147, 0.6931471]|[0.693, 0.805]|1.0383342865 |
|2 |[0.69314, 0.0] |[0.693, 0.805]|0.48034601999999993|
|3 |[0.0, 0.693147] |[0.693, 0.805]|0.557983335 |
+---+---------------------+--------------+-------------------+
我在 Spark Scala 中有两个数据框,每个数据框的第二列是一个数字数组
val data22= Seq((1,List(0.693147,0.6931471)),(2,List(0.69314, 0.0)),(3,List(0.0, 0.693147))).toDF("ID","tf_idf")
data22.show(truncate=false)
+---+---------------------+
|ID |tf_idf |
+---+---------------------+
|1 |[0.693, 0.702] |
|2 |[0.69314, 0.0] |
|3 |[0.0, 0.693147] |
+---+---------------------+
val data12= Seq((1,List(0.69314,0.6931471))).toDF("ID","tf_idf")
data12.show(truncate=false)
+---+--------------------+
|ID |tf_idf |
+---+--------------------+
|1 |[0.693, 0.805] |
+---+--------------------+
我需要在这两个数据框中的行之间执行点积。也就是说,我需要将 data12
中的 tf_idf
数组与 data22
中 tf_idf
的每一行相乘。
(例如:点积的第一行应该是这样的:0.693*0.693 + 0.702*0.805
第二行:0.69314*0.693 + 0.0*0.805
第三行:0.0*0.693 + 0.693147*0.805)
基本上我想要一些东西(比如矩阵乘法)data22
*transpose(data12)
如果有人可以建议在 Spark Scala 中执行此操作的方法,我将不胜感激。
谢谢
解决方法如下图:
scala> val data22= Seq((1,List(0.693147,0.6931471)),(2,List(0.69314, 0.0)),(3,List(0.0, 0.693147))).toDF("ID","tf_idf")
data22: org.apache.spark.sql.DataFrame = [ID: int, tf_idf: array<double>]
scala> val data12= Seq((1,List(0.69314,0.6931471))).toDF("ID","tf_idf")
data12: org.apache.spark.sql.DataFrame = [ID: int, tf_idf: array<double>]
scala> val arrayDot = data12.take(1).map(row => (row.getAs[Int](0), row.getAs[WrappedArray[Double]](1).toSeq))
arrayDot: Array[(Int, Seq[Double])] = Array((1,WrappedArray(0.69314, 0.6931471)))
scala> val dotColumn = arrayDot(0)._2
dotColumn: Seq[Double] = WrappedArray(0.69314, 0.6931471)
scala> val dotUdf = udf((y: Seq[Double]) => y zip dotColumn map(z => z._1*z._2) reduce(_ + _))
dotUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(ArrayType(DoubleType,false))))
scala> data22.withColumn("dotProduct", dotUdf('tf_idf)).show
+---+--------------------+-------------------+
| ID| tf_idf| dotProduct|
+---+--------------------+-------------------+
| 1|[0.693147, 0.6931...| 0.96090081381841|
| 2| [0.69314, 0.0]|0.48044305959999994|
| 3| [0.0, 0.693147]| 0.4804528329237|
+---+--------------------+-------------------+
请注意,它将 data12
中的 tf_idf
数组与 data22
中的 tf_idf
的每一行相乘。
如果有帮助请告诉我!!
Spark Version 2.4+: 使用数组的几个函数,例如 zip_with
和 aggregate
,让您的代码更简单。为了按照您的详细描述,我已将 join
更改为 crossJoin
。
val data22= Seq((1,List(0.693147,0.6931471)),(2,List(0.69314, 0.0)),(3,List(0.0, 0.693147))).toDF("ID","tf_idf")
val data12= Seq((1,List(0.693,0.805))).toDF("ID2","tf_idf2")
val df = data22.crossJoin(data12).drop("ID2")
df.withColumn("DotProduct", expr("aggregate(zip_with(tf_idf, tf_idf2, (x, y) -> x * y), 0D, (sum, x) -> sum + x)")).show(false)
这是结果。
+---+---------------------+--------------+-------------------+
|ID |tf_idf |tf_idf2 |DotProduct |
+---+---------------------+--------------+-------------------+
|1 |[0.693147, 0.6931471]|[0.693, 0.805]|1.0383342865 |
|2 |[0.69314, 0.0] |[0.693, 0.805]|0.48034601999999993|
|3 |[0.0, 0.693147] |[0.693, 0.805]|0.557983335 |
+---+---------------------+--------------+-------------------+