Spark Dataframes:将条件列添加到数据框
Spark Dataframes: Add Conditional column to dataframe
我想给dataframe A添加一个条件列Flag
,满足下面两个条件时,给Flag
加1,否则0:
数据帧 A 的 num
介于数据帧 B 的 numStart
和 numEnd
之间。
如果满足以上条件,检查include
是否为1
DataFrame A(这是一个非常大的数据框,包含数百万行):
+----+------+-----+------------------------+
|num |food |price|timestamp |
+----+------+-----+------------------------+
|1275|tomato|1.99 |2018-07-21T00:00:00.683Z|
|145 |carrot|0.45 |2018-07-21T00:00:03.346Z|
|2678|apple |0.99 |2018-07-21T01:00:05.731Z|
|6578|banana|1.29 |2018-07-20T01:11:59.957Z|
|1001|taco |2.59 |2018-07-21T01:00:07.961Z|
+----+------+-----+------------------------+
DataFrame B(非常小的DF,只有100行):
+----------+-----------+-------+
|numStart |numEnd |include|
+----------+-----------+-------+
|0 |200 |1 |
|250 |1050 |0 |
|2000 |3000 |1 |
|10001 |15001 |1 |
+----------+-----------+-------+
预期输出:
+----+------+-----+------------------------+----------+
|num |food |price|timestamp |Flag |
+----+------+-----+------------------------+----------+
|1275|tomato|1.99 |2018-07-21T00:00:00.683Z|0 |
|145 |carrot|0.45 |2018-07-21T00:00:03.346Z|1 |
|2678|apple |0.99 |2018-07-21T01:00:05.731Z|1 |
|6578|banana|1.29 |2018-07-20T01:11:59.957Z|0 |
|1001|taco |2.59 |2018-07-21T01:00:07.961Z|0 |
+----+------+-----+------------------------+----------+
在第一个条件下将两个数据帧连接在一起,同时将所有行保留在数据帧 A 中(即使用左连接,请参见下面的代码)。加入后,include
列可以重命名为 Flag
并且其中的任何 NaN 值都设置为 0。两个额外的列 numStart
和 numEnd
被删除。
代码可以这样写:
A.join(B, $"num" >= $"numStart" && $"num" <= $"numEnd", "left")
.withColumnRenamed("include", "Flag")
.drop("numStart", "numEnd")
.na.fill(Map("Flag" -> 0))
您可以根据您在 (i) 中描述的条件将 dfB
左连接到 dfA
,然后使用 withColumn
和 coalesce
函数到 "default" 到 0:
- 找到匹配项的记录将使用匹配
dfB
记录的 include
值
- 没有匹配项的记录将具有
include=null
,根据您的要求,此类记录应具有 Flag=0
,因此我们使用 coalesce
,如果为空 [=34] =] 带有文字的默认值 lit(0)
最后,删除您不感兴趣的 dfB
列:
import org.apache.spark.sql.functions._
import spark.implicits._ // assuming "spark" is your SparkSession
dfA.join(dfB, $"num".between($"numStart", $"numEnd"), "left")
.withColumn("Flag", coalesce($"include", lit(0)))
.drop(dfB.columns: _*)
.show()
// +----+------+-----+--------------------+----+
// | num| food|price| timestamp|Flag|
// +----+------+-----+--------------------+----+
// |1275|tomato| 1.99|2018-07-21T00:00:...| 0|
// | 145|carrot| 0.45|2018-07-21T00:00:...| 1|
// |2678| apple| 0.99|2018-07-21T01:00:...| 1|
// |6578|banana| 1.29|2018-07-20T01:11:...| 0|
// |1001| taco| 2.59|2018-07-21T01:00:...| 0|
// +----+------+-----+--------------------+----+
我想给dataframe A添加一个条件列Flag
,满足下面两个条件时,给Flag
加1,否则0:
-
数据帧 A 的
num
介于数据帧 B 的numStart
和numEnd
之间。如果满足以上条件,检查
include
是否为1
DataFrame A(这是一个非常大的数据框,包含数百万行):
+----+------+-----+------------------------+
|num |food |price|timestamp |
+----+------+-----+------------------------+
|1275|tomato|1.99 |2018-07-21T00:00:00.683Z|
|145 |carrot|0.45 |2018-07-21T00:00:03.346Z|
|2678|apple |0.99 |2018-07-21T01:00:05.731Z|
|6578|banana|1.29 |2018-07-20T01:11:59.957Z|
|1001|taco |2.59 |2018-07-21T01:00:07.961Z|
+----+------+-----+------------------------+
DataFrame B(非常小的DF,只有100行):
+----------+-----------+-------+
|numStart |numEnd |include|
+----------+-----------+-------+
|0 |200 |1 |
|250 |1050 |0 |
|2000 |3000 |1 |
|10001 |15001 |1 |
+----------+-----------+-------+
预期输出:
+----+------+-----+------------------------+----------+
|num |food |price|timestamp |Flag |
+----+------+-----+------------------------+----------+
|1275|tomato|1.99 |2018-07-21T00:00:00.683Z|0 |
|145 |carrot|0.45 |2018-07-21T00:00:03.346Z|1 |
|2678|apple |0.99 |2018-07-21T01:00:05.731Z|1 |
|6578|banana|1.29 |2018-07-20T01:11:59.957Z|0 |
|1001|taco |2.59 |2018-07-21T01:00:07.961Z|0 |
+----+------+-----+------------------------+----------+
在第一个条件下将两个数据帧连接在一起,同时将所有行保留在数据帧 A 中(即使用左连接,请参见下面的代码)。加入后,include
列可以重命名为 Flag
并且其中的任何 NaN 值都设置为 0。两个额外的列 numStart
和 numEnd
被删除。
代码可以这样写:
A.join(B, $"num" >= $"numStart" && $"num" <= $"numEnd", "left")
.withColumnRenamed("include", "Flag")
.drop("numStart", "numEnd")
.na.fill(Map("Flag" -> 0))
您可以根据您在 (i) 中描述的条件将 dfB
左连接到 dfA
,然后使用 withColumn
和 coalesce
函数到 "default" 到 0:
- 找到匹配项的记录将使用匹配
dfB
记录的include
值 - 没有匹配项的记录将具有
include=null
,根据您的要求,此类记录应具有Flag=0
,因此我们使用coalesce
,如果为空 [=34] =] 带有文字的默认值lit(0)
最后,删除您不感兴趣的 dfB
列:
import org.apache.spark.sql.functions._
import spark.implicits._ // assuming "spark" is your SparkSession
dfA.join(dfB, $"num".between($"numStart", $"numEnd"), "left")
.withColumn("Flag", coalesce($"include", lit(0)))
.drop(dfB.columns: _*)
.show()
// +----+------+-----+--------------------+----+
// | num| food|price| timestamp|Flag|
// +----+------+-----+--------------------+----+
// |1275|tomato| 1.99|2018-07-21T00:00:...| 0|
// | 145|carrot| 0.45|2018-07-21T00:00:...| 1|
// |2678| apple| 0.99|2018-07-21T01:00:...| 1|
// |6578|banana| 1.29|2018-07-20T01:11:...| 0|
// |1001| taco| 2.59|2018-07-21T01:00:...| 0|
// +----+------+-----+--------------------+----+