如何指向或 select 数据框中的单元格,Spark - Scala
How to point or select a cell in a dataframe, Spark - Scala
我想求2个小区的时差。
在 python 中使用 arrays
我会做一个 for loop
st[i+1] - st[i]
并将结果存储在某个地方。
我有这个数据框按时间排序。 Spark 2
or Scala
怎么实现,一个伪代码就够了
+--------------------+-------+
| st| name|
+--------------------+-------+
|15:30 |dog |
|15:32 |dog |
|18:33 |dog |
|18:34 |dog |
+--------------------+-------+
类似于:
object Data1 {
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
def main(args: Array[String]) : Unit = {
implicit val spark: SparkSession =
SparkSession
.builder()
.appName("Test")
.master("local[1]")
.getOrCreate()
import org.apache.spark.sql.functions.col
val rows = Seq(Row(1, 1), Row(1, 1), Row(1, 1))
val schema = List(StructField("int1", IntegerType, true), StructField("int2", IntegerType, true))
val someDF = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
StructType(schema)
)
someDF.withColumn("diff", col("int1") - col("int2")).show()
}
}
给予
+----+----+----+
|int1|int2|diff|
+----+----+----+
| 1| 1| 0|
| 1| 1| 0|
| 1| 1| 0|
+----+----+----+
如果您特别希望比较集合中的相邻元素,那么在 Scala 中,我会用尾部压缩集合以提供包含相邻对元组的集合。
不幸的是,RDD 上没有 tail 方法或 DataFrames/Sets
你可以这样做:
val a = myDF.rdd
val tail = myDF.rdd.zipWithIndex.collect{
case (index, v) if index > 1 => v}
a.zip(tail).map{ case (l, r) => /* diff l and r st column */}.collect
如果name
每个分区计算滑动diff
s,我会使用lag()
Window函数:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
("a", 100), ("a", 120),
("b", 200), ("b", 240), ("b", 270)
).toDF("name", "value")
val window = Window.partitionBy($"name").orderBy("value")
df.
withColumn("diff", $"value" - lag($"value", 1).over(window)).
na.fill(0).
orderBy("name", "value").
show
// +----+-----+----+
// |name|value|diff|
// +----+-----+----+
// | a| 100| 0|
// | a| 120| 20|
// | b| 200| 0|
// | b| 240| 40|
// | b| 270| 30|
// +----+-----+----+
另一方面,如果要在整个数据集上计算滑动 diff
,没有分区的 Window 函数将无法扩展,因此我会求助于使用 RDD 的 sliding()
函数:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd = df.rdd
val diffRDD = rdd.sliding(2).
map{ case Array(x, y) => Row(y.getString(0), y.getInt(1), y.getInt(1) - x.getInt(1)) }
val headRDD = sc.parallelize(Seq(Row.fromSeq(rdd.first.toSeq :+ 0)))
val headDF = spark.createDataFrame(headRDD, df.schema.add("diff", IntegerType))
val diffDF = spark.createDataFrame(diffRDD, df.schema.add("diff", IntegerType))
val resultDF = headDF union diffDF
resultDF.show
// +----+-----+----+
// |name|value|diff|
// +----+-----+----+
// | a| 100| 0|
// | a| 120| 20|
// | b| 200| 80|
// | b| 240| 40|
// | b| 270| 30|
// +----+-----+----+
我想求2个小区的时差。
在 python 中使用 arrays
我会做一个 for loop
st[i+1] - st[i]
并将结果存储在某个地方。
我有这个数据框按时间排序。 Spark 2
or Scala
怎么实现,一个伪代码就够了
+--------------------+-------+
| st| name|
+--------------------+-------+
|15:30 |dog |
|15:32 |dog |
|18:33 |dog |
|18:34 |dog |
+--------------------+-------+
类似于:
object Data1 {
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
def main(args: Array[String]) : Unit = {
implicit val spark: SparkSession =
SparkSession
.builder()
.appName("Test")
.master("local[1]")
.getOrCreate()
import org.apache.spark.sql.functions.col
val rows = Seq(Row(1, 1), Row(1, 1), Row(1, 1))
val schema = List(StructField("int1", IntegerType, true), StructField("int2", IntegerType, true))
val someDF = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
StructType(schema)
)
someDF.withColumn("diff", col("int1") - col("int2")).show()
}
}
给予
+----+----+----+
|int1|int2|diff|
+----+----+----+
| 1| 1| 0|
| 1| 1| 0|
| 1| 1| 0|
+----+----+----+
如果您特别希望比较集合中的相邻元素,那么在 Scala 中,我会用尾部压缩集合以提供包含相邻对元组的集合。
不幸的是,RDD 上没有 tail 方法或 DataFrames/Sets
你可以这样做:
val a = myDF.rdd
val tail = myDF.rdd.zipWithIndex.collect{
case (index, v) if index > 1 => v}
a.zip(tail).map{ case (l, r) => /* diff l and r st column */}.collect
如果name
每个分区计算滑动diff
s,我会使用lag()
Window函数:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
("a", 100), ("a", 120),
("b", 200), ("b", 240), ("b", 270)
).toDF("name", "value")
val window = Window.partitionBy($"name").orderBy("value")
df.
withColumn("diff", $"value" - lag($"value", 1).over(window)).
na.fill(0).
orderBy("name", "value").
show
// +----+-----+----+
// |name|value|diff|
// +----+-----+----+
// | a| 100| 0|
// | a| 120| 20|
// | b| 200| 0|
// | b| 240| 40|
// | b| 270| 30|
// +----+-----+----+
另一方面,如果要在整个数据集上计算滑动 diff
,没有分区的 Window 函数将无法扩展,因此我会求助于使用 RDD 的 sliding()
函数:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd = df.rdd
val diffRDD = rdd.sliding(2).
map{ case Array(x, y) => Row(y.getString(0), y.getInt(1), y.getInt(1) - x.getInt(1)) }
val headRDD = sc.parallelize(Seq(Row.fromSeq(rdd.first.toSeq :+ 0)))
val headDF = spark.createDataFrame(headRDD, df.schema.add("diff", IntegerType))
val diffDF = spark.createDataFrame(diffRDD, df.schema.add("diff", IntegerType))
val resultDF = headDF union diffDF
resultDF.show
// +----+-----+----+
// |name|value|diff|
// +----+-----+----+
// | a| 100| 0|
// | a| 120| 20|
// | b| 200| 80|
// | b| 240| 40|
// | b| 270| 30|
// +----+-----+----+