将行转置为列

Transpose rows into columns

我需要将行转置为列。有 2 tables(如下所示)。 product table 中的每条记录与 product_segment table 中的 012 记录匹配。可以有 2 种类型的产品 - HOSAMB。要求是根据此产品类型将 "segment" 值填充到目标中相应的 2 列(1 列用于 HOS,1 列用于 AM)。

根据源中存在的相应 product-type 记录,在目标中填充 HOS_segment 或 AMB_segment 的值。如果两种记录类型都存在,则在输出中填充这两个字段,否则填充存在的字段。

假设 tables 为:

产品:

product_id | eff_date
12345 | 10/01/2018
75852 | 22/05/2018
33995 | 15/02/2019

product_segment:

product_id | segment | type
12345 | KA | HOS
12345 | HM | AMB
75852 | GB | HOS
33995 | HD | AMB

预期输出:

product_id | eff_date | HOS_segment | AMB_segment
12345 | 10/01/2018 | KA | HM
75852 | 22/05/2018 | GB | Null
33995 | 15/02/2019 | Null | HD

对于 product 12345 HOS 和 AMB 记录都存在因此,在输出中,两列都填充了相应的段。
对于 product 75852 只有 HOS 记录存在,因此,HOS_segment 被填充但 AMB_segment 得到 Null
最后 product 33995 恰恰相反。 AMB_segment 得到填充但 HOS_segment 得到 Null

谁能帮我解决这个问题

您可以使用具有筛选段的联接 table。

import pyspark.sql.functions as F

product \
.join(product_segment.where("type = 'HOS'").select("product_id", F.col("segment").alias("HOS_segment")), "product_id", "left_outer") \
.join(product_segment.where("type = 'AMB'").select("product_id", F.col("segment").alias("AMB_segment")), "product_id", "left_outer")

instead of using joins and where I would suggest single join with pivot. here is the code snippet have a look.

>>> import pyspark.sql.functions as F
>>> df1= spark.createDataFrame([[12345,"10/01/2018"],[75852,"10/01/2018"],[33995,"10/01/2018"]],["product_id","eff_date"])
>>> df1.show()
+----------+----------+                                                         
|product_id|  eff_date|
+----------+----------+
|     12345|10/01/2018|
|     75852|10/01/2018|
|     33995|10/01/2018|
+----------+----------+

>>> df2 = spark.createDataFrame([[12345,"KA","HOS"],[12345,"HM","AMB"],[75852,"GB","HOS"],[33995,"HD","AMB"]],["product_id","Segment","type"])
>>> df2.show()

+----------+-------+----+
|product_id|Segment|type|
+----------+-------+----+
|     12345|     KA| HOS|
|     12345|     HM| AMB|
|     75852|     GB| HOS|
|     33995|     HD| AMB|
+----------+-------+----+

>>> df1.join(df2,df1.product_id ==df2.product_id,"inner").groupBy(df2.product_id,df1.eff_date).pivot("type").agg(F.first(df2.Segment)).show()
+----------+----------+----+----+
|product_id|  eff_date| AMB| HOS|
+----------+----------+----+----+
|     12345|10/01/2018|  HM|  KA|
|     33995|10/01/2018|  HD|null|
|     75852|10/01/2018|null|  GB|
+----------+----------+----+----+

Spark-sql 2.4+

>>> df1.registerTempTable("df1_temp")
>>> df2.registerTempTable("df2_temp")
>>> spark.sql("select * from(select a.*,b.segment,b.type from df1_temp a inner join df2_temp b on a.product_id =b.product_id) PIVOT( first(segment) for type in ('HOS' HOS_segment,'AMB' AMB_Segment )) " ).show()
+----------+----------+-----------+-----------+
|product_id|  eff_date|HOS_segment|AMB_Segment|
+----------+----------+-----------+-----------+
|     12345|10/01/2018|         KA|         HM|
|     33995|10/01/2018|       null|         HD|
|     75852|10/01/2018|         GB|       null|
+----------+----------+-----------+-----------+

希望对您有所帮助。如果您有任何与此相关的问题,请告诉我。