使用 Spark 转换构建派生列
Building derived column using Spark transformations
我得到了一条 table 记录,如下所述。
Id Indicator Date
1 R 2018-01-20
1 R 2018-10-21
1 P 2019-01-22
2 R 2018-02-28
2 P 2018-05-22
2 P 2019-03-05
我需要选择在过去一年中有两个以上 R
指标的 Id
s 并派生一个名为 Marked_Flag
的新列作为 Y
否则N
。所以预期的输出应该如下所示,
Id Marked_Flag
1 Y
2 N
所以我到目前为止所做的是,我将记录放入一个数据集中,然后再次从中构建另一个数据集。代码如下所示。
Dataset<row> getIndicators = spark.sql("select id, count(indicator) as indi_count from source group by id having indicator = 'R'");
Dataset<row>getFlag = spark.sql("select id, case when indi_count > 1 then 'Y' else 'N' end as Marked_Flag" from getIndicators");
但我领导使用单个数据集和使用 Spark 转换来完成这项工作。我是 Spark 的新手,关于这方面的任何指导或代码片段都会非常有帮助。
创建了两个数据集,一个用于获取聚合,另一个使用聚合值派生新列。
Dataset<row> getIndicators = spark.sql("select id, count(indicator) as indi_count from source group by id having indicator = 'R'");
Dataset<row>getFlag = spark.sql("select id, case when indi_count > 1 then 'Y' else 'N' end as Marked_Flag" from getIndicators");
Input
Expected output
尝试以下操作。请注意,我在这里使用的是 pyspark DataFrame
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
[1, "R", "2018-01-20"],
[1, "R", "2018-10-21"],
[1, "P", "2019-01-22"],
[2, "R", "2018-02-28"],
[2, "P", "2018-05-22"],
[2, "P", "2019-03-05"]], ["Id", "Indicator","Date"])
gr = df.filter(F.col("Indicator")=="R").groupBy("Id").agg(F.count("Indicator"))
gr = gr.withColumn("Marked_Flag", F.when(F.col("count(Indicator)") > 1, "Y").otherwise('N')).drop("count(Indicator)")
gr.show()
# +---+-----------+
# | Id|Marked_Flag|
# +---+-----------+
# | 1| Y|
# | 2| N|
# +---+-----------+
#
我得到了一条 table 记录,如下所述。
Id Indicator Date
1 R 2018-01-20
1 R 2018-10-21
1 P 2019-01-22
2 R 2018-02-28
2 P 2018-05-22
2 P 2019-03-05
我需要选择在过去一年中有两个以上 R
指标的 Id
s 并派生一个名为 Marked_Flag
的新列作为 Y
否则N
。所以预期的输出应该如下所示,
Id Marked_Flag
1 Y
2 N
所以我到目前为止所做的是,我将记录放入一个数据集中,然后再次从中构建另一个数据集。代码如下所示。
Dataset<row> getIndicators = spark.sql("select id, count(indicator) as indi_count from source group by id having indicator = 'R'");
Dataset<row>getFlag = spark.sql("select id, case when indi_count > 1 then 'Y' else 'N' end as Marked_Flag" from getIndicators");
但我领导使用单个数据集和使用 Spark 转换来完成这项工作。我是 Spark 的新手,关于这方面的任何指导或代码片段都会非常有帮助。
创建了两个数据集,一个用于获取聚合,另一个使用聚合值派生新列。
Dataset<row> getIndicators = spark.sql("select id, count(indicator) as indi_count from source group by id having indicator = 'R'");
Dataset<row>getFlag = spark.sql("select id, case when indi_count > 1 then 'Y' else 'N' end as Marked_Flag" from getIndicators");
Input
Expected output
尝试以下操作。请注意,我在这里使用的是 pyspark DataFrame
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
[1, "R", "2018-01-20"],
[1, "R", "2018-10-21"],
[1, "P", "2019-01-22"],
[2, "R", "2018-02-28"],
[2, "P", "2018-05-22"],
[2, "P", "2019-03-05"]], ["Id", "Indicator","Date"])
gr = df.filter(F.col("Indicator")=="R").groupBy("Id").agg(F.count("Indicator"))
gr = gr.withColumn("Marked_Flag", F.when(F.col("count(Indicator)") > 1, "Y").otherwise('N')).drop("count(Indicator)")
gr.show()
# +---+-----------+
# | Id|Marked_Flag|
# +---+-----------+
# | 1| Y|
# | 2| N|
# +---+-----------+
#