如何相对于spark中的列值在一组行上设置增量ID
how to set a increment id over set of rows with respect to a col value in spark
嗨,我的数据集如下:
我的输入:
+----------+----------------+
| id | flag |
+----------+----------------|
| 1 | false |
+----------+----------------|
| 2 | true |
+----------+----------------|
| 3 | false |
+----------+----------------|
| 4 | true |
+----------+----------------|
| 5 | false |
+----------+----------------|
| 6 | false |
+----------+----------------|
| 7 | true |
+----------+----------------+
输出:
+----------+----------------+----------------------------+
| id | flag | new_col |
+----------+---------------------------------------------+
| 1 | false | 1 |
+----------+---------------------------------------------+
| 2 | true | 1 |
+----------+----------------+----------------------------+
| 3 | false | 3 |
+----------+----------------+----------------------------+
| 4 | true | 3 |
+----------+----------------+----------------------------+
| 5 | false | 5 |
+----------+----------------+----------------------------+
| 6 | false | 6 |
+----------+----------------+----------------------------+
| 7 | true | 6 |
+----------+----------------+----------------------------+
每个假值都会将 new_col 值更改为它的 id 等等...
有什么帮助吗?
对于较小的数据集,您可以执行以下操作:
- 使用
when
-otherwise
与 withColumn
一起创建一个新列,该列将根据值采用 id
或 null
的值flag
的 SQL 相当于:
CASE WHEN FLAG = 'TRUE' THEN ID ELSE NULL END AS NEW_COL
df.show
//+---+-----+
//| id| flag|
//+---+-----+
//| 1|false|
//| 2| true|
//| 3| true|
//| 4| true|
//| 5|false|
//| 6| true|
//| 7| true|
//+---+-----+
//Defining a Window over which we will call the function
import org.apache.spark.sql.expressions.Window
//No partitionBy clause so all the data will move to a single partition
//You'll also get a warning related to that
val w = Window.orderBy($"id")
//The value of `id` will be the same where `flag` is `false`
//last will be called over the window to fill the null values
df.withColumn("new_col" , when($"flag" === lit(false) , $"id").otherwise(null))
.withColumn("new_col" , coalesce($"new_col" , last($"new_col", true).over(w) ) )
.show
//+---+-----+-------+
//|id |flag |new_col|
//+---+-----+-------+
//|1 |false|1 |
//|2 |true |1 |
//|3 |true |1 |
//|4 |true |1 |
//|5 |false|5 |
//|6 |true |5 |
//|7 |true |5 |
//+---+-----+-------+
如果您想使用 rdd
方式,那么您可以 将所有数据传递给一个执行器 并执行 for 循环 如下
df.rdd.coalesce(1).mapPartitions(iterator => {
var y = "1"
for (x <- iterator) yield {
val id = x.getAs[String]("id")
val flag = x.getAs[Boolean]("flag")
if(flag == false){
y = id
newdf(id, flag, y)
}else{
newdf(id, flag, y)
}
}
}).toDF()
为此你需要一个 案例 class
case class newdf(id:String, flag:Boolean, new_id:String)
你也可以不用大小写class但我更喜欢使用大小写class
嗨,我的数据集如下:
我的输入:
+----------+----------------+
| id | flag |
+----------+----------------|
| 1 | false |
+----------+----------------|
| 2 | true |
+----------+----------------|
| 3 | false |
+----------+----------------|
| 4 | true |
+----------+----------------|
| 5 | false |
+----------+----------------|
| 6 | false |
+----------+----------------|
| 7 | true |
+----------+----------------+
输出:
+----------+----------------+----------------------------+
| id | flag | new_col |
+----------+---------------------------------------------+
| 1 | false | 1 |
+----------+---------------------------------------------+
| 2 | true | 1 |
+----------+----------------+----------------------------+
| 3 | false | 3 |
+----------+----------------+----------------------------+
| 4 | true | 3 |
+----------+----------------+----------------------------+
| 5 | false | 5 |
+----------+----------------+----------------------------+
| 6 | false | 6 |
+----------+----------------+----------------------------+
| 7 | true | 6 |
+----------+----------------+----------------------------+
每个假值都会将 new_col 值更改为它的 id 等等... 有什么帮助吗?
对于较小的数据集,您可以执行以下操作:
- 使用
when
-otherwise
与withColumn
一起创建一个新列,该列将根据值采用id
或null
的值flag
的 SQL 相当于:
CASE WHEN FLAG = 'TRUE' THEN ID ELSE NULL END AS NEW_COL
df.show
//+---+-----+
//| id| flag|
//+---+-----+
//| 1|false|
//| 2| true|
//| 3| true|
//| 4| true|
//| 5|false|
//| 6| true|
//| 7| true|
//+---+-----+
//Defining a Window over which we will call the function
import org.apache.spark.sql.expressions.Window
//No partitionBy clause so all the data will move to a single partition
//You'll also get a warning related to that
val w = Window.orderBy($"id")
//The value of `id` will be the same where `flag` is `false`
//last will be called over the window to fill the null values
df.withColumn("new_col" , when($"flag" === lit(false) , $"id").otherwise(null))
.withColumn("new_col" , coalesce($"new_col" , last($"new_col", true).over(w) ) )
.show
//+---+-----+-------+
//|id |flag |new_col|
//+---+-----+-------+
//|1 |false|1 |
//|2 |true |1 |
//|3 |true |1 |
//|4 |true |1 |
//|5 |false|5 |
//|6 |true |5 |
//|7 |true |5 |
//+---+-----+-------+
如果您想使用 rdd
方式,那么您可以 将所有数据传递给一个执行器 并执行 for 循环 如下
df.rdd.coalesce(1).mapPartitions(iterator => {
var y = "1"
for (x <- iterator) yield {
val id = x.getAs[String]("id")
val flag = x.getAs[Boolean]("flag")
if(flag == false){
y = id
newdf(id, flag, y)
}else{
newdf(id, flag, y)
}
}
}).toDF()
为此你需要一个 案例 class
case class newdf(id:String, flag:Boolean, new_id:String)
你也可以不用大小写class但我更喜欢使用大小写class