Apache-Spark 数据帧中的距离总和
Sum the Distance in Apache-Spark dataframes
以下代码给出了一个数据框,其中每列包含三个值,如下所示。
import org.graphframes._
import org.apache.spark.sql.DataFrame
val v = sqlContext.createDataFrame(List(
("1", "Al"),
("2", "B"),
("3", "C"),
("4", "D"),
("5", "E")
)).toDF("id", "name")
val e = sqlContext.createDataFrame(List(
("1", "3", 5),
("1", "2", 8),
("2", "3", 6),
("2", "4", 7),
("2", "1", 8),
("3", "1", 5),
("3", "2", 6),
("4", "2", 7),
("4", "5", 8),
("5", "4", 8)
)).toDF("src", "dst", "property")
val g = GraphFrame(v, e)
val paths: DataFrame = g.bfs.fromExpr("id = '1'").toExpr("id = '5'").run()
paths.show()
val df=paths
df.select(df.columns.filter(_.startsWith("e")).map(df(_)) : _*).show
上面代码的输出如下::
+-------+-------+-------+
| e0| e1| e2|
+-------+-------+-------+
|[1,2,8]|[2,4,7]|[4,5,8]|
+-------+-------+-------+
在上面的输出中,我们可以看到每一列都有三个值,它们可以解释如下。
e0 :
source 1, Destination 2 and distance 8
e1:
source 2, Destination 4 and distance 7
e2:
source 4, Destination 5 and distance 8
基本上e0
、e1
、e3
就是边。我想对每列的第三个元素求和,即添加每条边的距离以获得总距离。我怎样才能做到这一点?
我会收集要求和的列,然后在 UDF
上使用 foldLeft
:
scala> val df = Seq((Array(1,2,8),Array(2,4,7),Array(4,5,8))).toDF("e0", "e1", "e2")
df: org.apache.spark.sql.DataFrame = [e0: array<int>, e1: array<int>, e2: array<int>]
scala> df.show
+---------+---------+---------+
| e0| e1| e2|
+---------+---------+---------+
|[1, 2, 8]|[2, 4, 7]|[4, 5, 8]|
+---------+---------+---------+
scala> val colsToSum = df.columns
colsToSum: Array[String] = Array(e0, e1, e2)
scala> val accLastUDF = udf((acc: Int, col: Seq[Int]) => acc + col.last)
accLastUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,List(IntegerType, ArrayType(IntegerType,false)))
scala> df.withColumn("dist", colsToSum.foldLeft(lit(0))((acc, colName) => accLastUDF(acc, col(colName)))).show
+---------+---------+---------+----+
| e0| e1| e2|dist|
+---------+---------+---------+----+
|[1, 2, 8]|[2, 4, 7]|[4, 5, 8]| 23|
+---------+---------+---------+----+
可以这样做:
val total = df.columns.filter(_.startsWith("e"))
.map(c => col(s"$c.property")) // or col(c).getItem("property")
.reduce(_ + _)
df.withColumn("total", total)
以下代码给出了一个数据框,其中每列包含三个值,如下所示。
import org.graphframes._
import org.apache.spark.sql.DataFrame
val v = sqlContext.createDataFrame(List(
("1", "Al"),
("2", "B"),
("3", "C"),
("4", "D"),
("5", "E")
)).toDF("id", "name")
val e = sqlContext.createDataFrame(List(
("1", "3", 5),
("1", "2", 8),
("2", "3", 6),
("2", "4", 7),
("2", "1", 8),
("3", "1", 5),
("3", "2", 6),
("4", "2", 7),
("4", "5", 8),
("5", "4", 8)
)).toDF("src", "dst", "property")
val g = GraphFrame(v, e)
val paths: DataFrame = g.bfs.fromExpr("id = '1'").toExpr("id = '5'").run()
paths.show()
val df=paths
df.select(df.columns.filter(_.startsWith("e")).map(df(_)) : _*).show
上面代码的输出如下::
+-------+-------+-------+
| e0| e1| e2|
+-------+-------+-------+
|[1,2,8]|[2,4,7]|[4,5,8]|
+-------+-------+-------+
在上面的输出中,我们可以看到每一列都有三个值,它们可以解释如下。
e0 :
source 1, Destination 2 and distance 8
e1:
source 2, Destination 4 and distance 7
e2:
source 4, Destination 5 and distance 8
基本上e0
、e1
、e3
就是边。我想对每列的第三个元素求和,即添加每条边的距离以获得总距离。我怎样才能做到这一点?
我会收集要求和的列,然后在 UDF
上使用 foldLeft
:
scala> val df = Seq((Array(1,2,8),Array(2,4,7),Array(4,5,8))).toDF("e0", "e1", "e2")
df: org.apache.spark.sql.DataFrame = [e0: array<int>, e1: array<int>, e2: array<int>]
scala> df.show
+---------+---------+---------+
| e0| e1| e2|
+---------+---------+---------+
|[1, 2, 8]|[2, 4, 7]|[4, 5, 8]|
+---------+---------+---------+
scala> val colsToSum = df.columns
colsToSum: Array[String] = Array(e0, e1, e2)
scala> val accLastUDF = udf((acc: Int, col: Seq[Int]) => acc + col.last)
accLastUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,List(IntegerType, ArrayType(IntegerType,false)))
scala> df.withColumn("dist", colsToSum.foldLeft(lit(0))((acc, colName) => accLastUDF(acc, col(colName)))).show
+---------+---------+---------+----+
| e0| e1| e2|dist|
+---------+---------+---------+----+
|[1, 2, 8]|[2, 4, 7]|[4, 5, 8]| 23|
+---------+---------+---------+----+
可以这样做:
val total = df.columns.filter(_.startsWith("e"))
.map(c => col(s"$c.property")) // or col(c).getItem("property")
.reduce(_ + _)
df.withColumn("total", total)