如何指向或 select 数据框中的单元格,Spark - Scala

How to point or select a cell in a dataframe, Spark - Scala

我想求2个小区的时差。

在 python 中使用 arrays 我会做一个 for loop st[i+1] - st[i] 并将结果存储在某个地方。

我有这个数据框按时间排序。 Spark 2 or Scala 怎么实现,一个伪代码就够了

+--------------------+-------+
|                  st|   name|
+--------------------+-------+
|15:30               |dog    |
|15:32               |dog    |
|18:33               |dog    |
|18:34               |dog    |
+--------------------+-------+

类似于:

object Data1 {

  import org.apache.log4j.Logger
  import org.apache.log4j.Level

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  def main(args: Array[String]) : Unit = {
    implicit val spark: SparkSession =
      SparkSession
        .builder()
        .appName("Test")
        .master("local[1]")
        .getOrCreate()

    import org.apache.spark.sql.functions.col

    val rows = Seq(Row(1, 1), Row(1, 1), Row(1, 1))
    val schema = List(StructField("int1", IntegerType, true), StructField("int2", IntegerType, true))

    val someDF = spark.createDataFrame(
      spark.sparkContext.parallelize(rows),
      StructType(schema)
    )

    someDF.withColumn("diff", col("int1") - col("int2")).show()
  }
}

给予

+----+----+----+
|int1|int2|diff|
+----+----+----+
|   1|   1|   0|
|   1|   1|   0|
|   1|   1|   0|
+----+----+----+

如果您特别希望比较集合中的相邻元素,那么在 Scala 中,我会用尾部压缩集合以提供包含相邻对元组的集合。

不幸的是,RDD 上没有 tail 方法或 DataFrames/Sets

你可以这样做:

val a = myDF.rdd
val tail = myDF.rdd.zipWithIndex.collect{
  case (index, v) if index > 1 => v}

a.zip(tail).map{ case (l, r) => /* diff l and r st column */}.collect

如果name每个分区计算滑动diffs,我会使用lag()Window函数:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("a", 100), ("a", 120),
  ("b", 200), ("b", 240), ("b", 270)
).toDF("name", "value")

val window = Window.partitionBy($"name").orderBy("value")

df.
  withColumn("diff", $"value" - lag($"value", 1).over(window)).
  na.fill(0).
  orderBy("name", "value").
  show
// +----+-----+----+
// |name|value|diff|
// +----+-----+----+
// |   a|  100|   0|
// |   a|  120|  20|
// |   b|  200|   0|
// |   b|  240|  40|
// |   b|  270|  30|
// +----+-----+----+

另一方面,如果要在整个数据集上计算滑动 diff,没有分区的 Window 函数将无法扩展,因此我会求助于使用 RDD 的 sliding()函数:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.mllib.rdd.RDDFunctions._

val rdd = df.rdd

val diffRDD = rdd.sliding(2).
  map{ case Array(x, y) => Row(y.getString(0), y.getInt(1), y.getInt(1) - x.getInt(1)) }

val headRDD = sc.parallelize(Seq(Row.fromSeq(rdd.first.toSeq :+ 0)))

val headDF = spark.createDataFrame(headRDD, df.schema.add("diff", IntegerType))
val diffDF = spark.createDataFrame(diffRDD, df.schema.add("diff", IntegerType))

val resultDF = headDF union diffDF
resultDF.show
// +----+-----+----+
// |name|value|diff|
// +----+-----+----+
// |   a|  100|   0|
// |   a|  120|  20|
// |   b|  200|  80|
// |   b|  240|  40|
// |   b|  270|  30|
// +----+-----+----+