将行转置为列
Transpose rows into columns
我需要将行转置为列。有 2 tables(如下所示)。 product
table 中的每条记录与 product_segment
table 中的 0
或 1
或 2
记录匹配。可以有 2 种类型的产品 - HOS
和 AMB
。要求是根据此产品类型将 "segment" 值填充到目标中相应的 2 列(1 列用于 HOS,1 列用于 AM)。
根据源中存在的相应 product-type
记录,在目标中填充 HOS_segment 或 AMB_segment 的值。如果两种记录类型都存在,则在输出中填充这两个字段,否则填充存在的字段。
假设 tables 为:
产品:
product_id | eff_date
12345 | 10/01/2018
75852 | 22/05/2018
33995 | 15/02/2019
product_segment:
product_id | segment | type
12345 | KA | HOS
12345 | HM | AMB
75852 | GB | HOS
33995 | HD | AMB
预期输出:
product_id | eff_date | HOS_segment | AMB_segment
12345 | 10/01/2018 | KA | HM
75852 | 22/05/2018 | GB | Null
33995 | 15/02/2019 | Null | HD
对于 product 12345
HOS 和 AMB 记录都存在因此,在输出中,两列都填充了相应的段。
对于 product 75852
只有 HOS 记录存在,因此,HOS_segment 被填充但 AMB_segment 得到 Null
最后 product 33995
恰恰相反。 AMB_segment 得到填充但 HOS_segment 得到 Null
谁能帮我解决这个问题
您可以使用具有筛选段的联接 table。
import pyspark.sql.functions as F
product \
.join(product_segment.where("type = 'HOS'").select("product_id", F.col("segment").alias("HOS_segment")), "product_id", "left_outer") \
.join(product_segment.where("type = 'AMB'").select("product_id", F.col("segment").alias("AMB_segment")), "product_id", "left_outer")
instead of using joins and where I would suggest single join with pivot. here is the code snippet have a look.
>>> import pyspark.sql.functions as F
>>> df1= spark.createDataFrame([[12345,"10/01/2018"],[75852,"10/01/2018"],[33995,"10/01/2018"]],["product_id","eff_date"])
>>> df1.show()
+----------+----------+
|product_id| eff_date|
+----------+----------+
| 12345|10/01/2018|
| 75852|10/01/2018|
| 33995|10/01/2018|
+----------+----------+
>>> df2 = spark.createDataFrame([[12345,"KA","HOS"],[12345,"HM","AMB"],[75852,"GB","HOS"],[33995,"HD","AMB"]],["product_id","Segment","type"])
>>> df2.show()
+----------+-------+----+
|product_id|Segment|type|
+----------+-------+----+
| 12345| KA| HOS|
| 12345| HM| AMB|
| 75852| GB| HOS|
| 33995| HD| AMB|
+----------+-------+----+
>>> df1.join(df2,df1.product_id ==df2.product_id,"inner").groupBy(df2.product_id,df1.eff_date).pivot("type").agg(F.first(df2.Segment)).show()
+----------+----------+----+----+
|product_id| eff_date| AMB| HOS|
+----------+----------+----+----+
| 12345|10/01/2018| HM| KA|
| 33995|10/01/2018| HD|null|
| 75852|10/01/2018|null| GB|
+----------+----------+----+----+
Spark-sql 2.4+
>>> df1.registerTempTable("df1_temp")
>>> df2.registerTempTable("df2_temp")
>>> spark.sql("select * from(select a.*,b.segment,b.type from df1_temp a inner join df2_temp b on a.product_id =b.product_id) PIVOT( first(segment) for type in ('HOS' HOS_segment,'AMB' AMB_Segment )) " ).show()
+----------+----------+-----------+-----------+
|product_id| eff_date|HOS_segment|AMB_Segment|
+----------+----------+-----------+-----------+
| 12345|10/01/2018| KA| HM|
| 33995|10/01/2018| null| HD|
| 75852|10/01/2018| GB| null|
+----------+----------+-----------+-----------+
希望对您有所帮助。如果您有任何与此相关的问题,请告诉我。
我需要将行转置为列。有 2 tables(如下所示)。 product
table 中的每条记录与 product_segment
table 中的 0
或 1
或 2
记录匹配。可以有 2 种类型的产品 - HOS
和 AMB
。要求是根据此产品类型将 "segment" 值填充到目标中相应的 2 列(1 列用于 HOS,1 列用于 AM)。
根据源中存在的相应 product-type
记录,在目标中填充 HOS_segment 或 AMB_segment 的值。如果两种记录类型都存在,则在输出中填充这两个字段,否则填充存在的字段。
假设 tables 为:
产品:
product_id | eff_date
12345 | 10/01/2018
75852 | 22/05/2018
33995 | 15/02/2019
product_segment:
product_id | segment | type
12345 | KA | HOS
12345 | HM | AMB
75852 | GB | HOS
33995 | HD | AMB
预期输出:
product_id | eff_date | HOS_segment | AMB_segment
12345 | 10/01/2018 | KA | HM
75852 | 22/05/2018 | GB | Null
33995 | 15/02/2019 | Null | HD
对于 product 12345
HOS 和 AMB 记录都存在因此,在输出中,两列都填充了相应的段。
对于 product 75852
只有 HOS 记录存在,因此,HOS_segment 被填充但 AMB_segment 得到 Null
最后 product 33995
恰恰相反。 AMB_segment 得到填充但 HOS_segment 得到 Null
谁能帮我解决这个问题
您可以使用具有筛选段的联接 table。
import pyspark.sql.functions as F
product \
.join(product_segment.where("type = 'HOS'").select("product_id", F.col("segment").alias("HOS_segment")), "product_id", "left_outer") \
.join(product_segment.where("type = 'AMB'").select("product_id", F.col("segment").alias("AMB_segment")), "product_id", "left_outer")
instead of using joins and where I would suggest single join with pivot. here is the code snippet have a look.
>>> import pyspark.sql.functions as F
>>> df1= spark.createDataFrame([[12345,"10/01/2018"],[75852,"10/01/2018"],[33995,"10/01/2018"]],["product_id","eff_date"])
>>> df1.show()
+----------+----------+
|product_id| eff_date|
+----------+----------+
| 12345|10/01/2018|
| 75852|10/01/2018|
| 33995|10/01/2018|
+----------+----------+
>>> df2 = spark.createDataFrame([[12345,"KA","HOS"],[12345,"HM","AMB"],[75852,"GB","HOS"],[33995,"HD","AMB"]],["product_id","Segment","type"])
>>> df2.show()
+----------+-------+----+
|product_id|Segment|type|
+----------+-------+----+
| 12345| KA| HOS|
| 12345| HM| AMB|
| 75852| GB| HOS|
| 33995| HD| AMB|
+----------+-------+----+
>>> df1.join(df2,df1.product_id ==df2.product_id,"inner").groupBy(df2.product_id,df1.eff_date).pivot("type").agg(F.first(df2.Segment)).show()
+----------+----------+----+----+
|product_id| eff_date| AMB| HOS|
+----------+----------+----+----+
| 12345|10/01/2018| HM| KA|
| 33995|10/01/2018| HD|null|
| 75852|10/01/2018|null| GB|
+----------+----------+----+----+
Spark-sql 2.4+
>>> df1.registerTempTable("df1_temp")
>>> df2.registerTempTable("df2_temp")
>>> spark.sql("select * from(select a.*,b.segment,b.type from df1_temp a inner join df2_temp b on a.product_id =b.product_id) PIVOT( first(segment) for type in ('HOS' HOS_segment,'AMB' AMB_Segment )) " ).show()
+----------+----------+-----------+-----------+
|product_id| eff_date|HOS_segment|AMB_Segment|
+----------+----------+-----------+-----------+
| 12345|10/01/2018| KA| HM|
| 33995|10/01/2018| null| HD|
| 75852|10/01/2018| GB| null|
+----------+----------+-----------+-----------+
希望对您有所帮助。如果您有任何与此相关的问题,请告诉我。