如何相对于spark中的列值在一组行上设置增量ID

how to set a increment id over set of rows with respect to a col value in spark

嗨,我的数据集如下:

我的输入:

+----------+----------------+
|  id      |  flag          |
+----------+----------------|
|  1       | false          |  
+----------+----------------|
|  2       | true           | 
+----------+----------------|
|  3       | false          |
+----------+----------------|
|  4       | true           |  
+----------+----------------|
|  5       | false          |
+----------+----------------|
|  6       | false          |  
+----------+----------------|
|  7       | true           |
+----------+----------------+

输出:

+----------+----------------+----------------------------+
|  id      |  flag          |  new_col                   |
+----------+---------------------------------------------+
|  1       | false          |      1                     |
+----------+---------------------------------------------+
|  2       | true           |      1                     |
+----------+----------------+----------------------------+
|  3       | false          |      3                     |
+----------+----------------+----------------------------+
|  4       | true           |      3                     |
+----------+----------------+----------------------------+
|  5       | false          |      5                     |
+----------+----------------+----------------------------+
|  6       | false          |      6                     |
+----------+----------------+----------------------------+
|  7       | true           |      6                     |
+----------+----------------+----------------------------+

每个假值都会将 new_col 值更改为它的 id 等等... 有什么帮助吗?

对于较小的数据集,您可以执行以下操作:

  1. 使用 when-otherwisewithColumn 一起创建一个新列,该列将根据值采用 idnull 的值flag 的 SQL 相当于:
CASE WHEN FLAG = 'TRUE' THEN ID ELSE NULL END AS NEW_COL
  1. 然后在 Window 上使用 coalesce to replace all the nulls with last 得到最后一个非空值:
df.show
//+---+-----+
//| id| flag|
//+---+-----+
//|  1|false|
//|  2| true|
//|  3| true|
//|  4| true|
//|  5|false|
//|  6| true|
//|  7| true|
//+---+-----+

//Defining a Window over which we will call the function
import org.apache.spark.sql.expressions.Window

//No partitionBy clause so all the data will move to a single partition
//You'll also get a warning related to that
val w = Window.orderBy($"id")

//The value of `id` will be the same where `flag` is `false`
//last will be called over the window to fill the null values   
df.withColumn("new_col" , when($"flag" === lit(false) , $"id").otherwise(null))
  .withColumn("new_col" , coalesce($"new_col" , last($"new_col", true).over(w) ) )
  .show   
//+---+-----+-------+
//|id |flag |new_col|
//+---+-----+-------+
//|1  |false|1      |
//|2  |true |1      |
//|3  |true |1      |
//|4  |true |1      |
//|5  |false|5      |
//|6  |true |5      |
//|7  |true |5      |
//+---+-----+-------+

如果您想使用 rdd 方式,那么您可以 将所有数据传递给一个执行器 并执行 for 循环 如下

df.rdd.coalesce(1).mapPartitions(iterator => {
  var y = "1"
  for (x <- iterator) yield {
    val id = x.getAs[String]("id")
    val flag = x.getAs[Boolean]("flag")
    if(flag == false){
      y = id
      newdf(id, flag, y)
    }else{
      newdf(id, flag, y)
    }
  }
}).toDF()

为此你需要一个 案例 class

case class newdf(id:String, flag:Boolean, new_id:String)

也可以不用大小写class但我更喜欢使用大小写class