Spark Dataframe 中的嵌套数据透视表
Nested Pivot in Spark Dataframe
我有以下两个DF
MasterDF
NumberDF(使用 Hive 加载创建)
期望输出:
填充逻辑
对于 Field1 需要选择 sch_id where CAT='PAY' and SUB_CAT='client'
对于 Field2 需要选择 sch_id where CAT='PAY' and SUB_CAT='phr'
对于 Field3 需要选择 pay_id where CAT='credit' and
SUB_CAT='spGrp'
目前在加入之前我对 NumberDF 执行过滤并选择值
例如:
masterDF.as("master").join(NumberDF.filter(col("CAT")==="PAY" && col("SUB_CAT")==="phr").as("number"), "$master.id" ==="$number.id" , "leftouter" )
.select($"master.*", $"number.sch_id".as("field1") )
以上方法需要多个连接。我研究了枢轴函数,但它确实解决了我的问题
注意: 请忽略代码中的语法错误
更好的解决方案是在加入 studentDF 之前按列(主题)旋转 DataFrame (numberDF)。
pyspark 代码如下所示
numberDF = spark.createDataFrame([(1, "Math", 80), (1, "English", 60), (1, "Science", 80)], ["id", "subject", "marks"])
studentDF = spark.createDataFrame([(1, "Vikas")],["id","name"])
>>> numberDF.show()
+---+-------+-----+
| id|subject|marks|
+---+-------+-----+
| 1| Math| 80|
| 1|English| 60|
| 1|Science| 80|
+---+-------+-----+
>>> studentDF.show()
+---+-----+
| id| name|
+---+-----+
| 1|Vikas|
+---+-----+
pivotNumberDF = numberDF.groupBy("id").pivot("subject").sum("marks")
>>> pivotNumberDF.show()
+---+-------+----+-------+
| id|English|Math|Science|
+---+-------+----+-------+
| 1| 60| 80| 80|
+---+-------+----+-------+
>>> studentDF.join(pivotNumberDF, "id").show()
+---+-----+-------+----+-------+
| id| name|English|Math|Science|
+---+-----+-------+----+-------+
| 1|Vikas| 60| 80| 80|
+---+-----+-------+----+-------+
参考:http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html
df.createOrReplaceTempView("NumberDF")
df.createOrReplaceTempView("MasterDf")
val sqlDF = spark.sql("select m.id,t1.fld1,t2.fld2,t3.fld3,m.otherfields
from
(select id, (case when n.cat='pay' and n.sub_cat ='client' then n.sch_id end) fld1
from NumberDF n where case when n.cat='pay' and n.sub_cat ='client' then n.sch_id end is not null ) t1 ,
(select id, (case when n.cat='pay' and n.sub_cat ='phr' then n.sch_id end) fld2
from NumberDF n where case when n.cat='pay' and n.sub_cat ='phr' then n.sch_id end is not null ) t2,
(select id, (case when n.cat='credit' and n.sub_cat ='spGrp' then n.pay_id end) fld3
from NumberDF n where case when n.cat='credit' and n.sub_cat ='spGrp' then n.pay_id end is not null ) t3,
MasterDf m ")
sqlDF.show()
我终于用 Pivot 实现了它
flights.groupBy("ID", "CAT")
.pivot("SUB_CAT", Seq("client", "phr", "spGrp")).agg(avg("SCH_ID").as("SCH_ID"), avg("pay_id").as("pay_id"))
.groupBy("ID")
.pivot("CAT", Seq("credit", "price"))
.agg(
avg("client_SCH_ID").as("client_sch_id"), avg("client_pay_id").as("client_pay_id")
, avg("phr_SCH_ID").as("phr_SCH_ID"), avg("phr_pay_id").as("phr_pay_id")
, avg("spGrp_SCH_ID").as("spGrp_SCH_ID"), avg("spGrp_pay_id").as("spGrp_pay_id")
)
第一个枢轴将
Return table 喜欢
+---+------+-------------+--------------+-----------+------------+-------------+--------------+
| ID| CAT|client_SCH_ID|client_pay_id |phr_SCH_ID |phr_pay_id |spnGrp_SCH_ID|spnGrp_pay_id |
+---+------+-------------+--------------+-----------+------------+-------------+--------------+
| 1|credit| 5.0| 105.0| 4.0| 104.0| 6.0| 106.0|
| 1| pay | 2.0| 102.0| 1.0| 101.0| 3.0| 103.0|
+---+------+-------------+--------------+-----------+------------+-------------+--------------+
在第二个 Pivot 之后就像
+---+--------------------+---------------------+------------------+-------------------+--------------------+---------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+
| ID|credit_client_sch_id|credit_client_pay_id | credit_phr_SCH_ID| credit_phr_pay_id |credit_spnGrp_SCH_ID|credit_spnGrp_pay_id |pay_client_sch_id|pay_client_pay_id | pay_phr_SCH_ID| pay_phr_pay_id |pay_spnGrp_SCH_ID|pay_spnGrp_pay_id |
+---+--------------------+---------------------+------------------+-------------------+--------------------+---------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+
| 1| 5.0| 105.0| 4.0| 104.0| 6.0| 106.0| 2.0| 102.0| 1.0| 101.0| 3.0| 103.0|
+---+--------------------+---------------------+------------------+-------------------+--------------------+---------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+
虽然我不确定性能。
我有以下两个DF
MasterDF
NumberDF(使用 Hive 加载创建)
期望输出:
填充逻辑
对于 Field1 需要选择 sch_id where CAT='PAY' and SUB_CAT='client'
对于 Field2 需要选择 sch_id where CAT='PAY' and SUB_CAT='phr'
对于 Field3 需要选择 pay_id where CAT='credit' and SUB_CAT='spGrp'
目前在加入之前我对 NumberDF 执行过滤并选择值 例如:
masterDF.as("master").join(NumberDF.filter(col("CAT")==="PAY" && col("SUB_CAT")==="phr").as("number"), "$master.id" ==="$number.id" , "leftouter" )
.select($"master.*", $"number.sch_id".as("field1") )
以上方法需要多个连接。我研究了枢轴函数,但它确实解决了我的问题
注意: 请忽略代码中的语法错误
更好的解决方案是在加入 studentDF 之前按列(主题)旋转 DataFrame (numberDF)。
pyspark 代码如下所示
numberDF = spark.createDataFrame([(1, "Math", 80), (1, "English", 60), (1, "Science", 80)], ["id", "subject", "marks"])
studentDF = spark.createDataFrame([(1, "Vikas")],["id","name"])
>>> numberDF.show()
+---+-------+-----+
| id|subject|marks|
+---+-------+-----+
| 1| Math| 80|
| 1|English| 60|
| 1|Science| 80|
+---+-------+-----+
>>> studentDF.show()
+---+-----+
| id| name|
+---+-----+
| 1|Vikas|
+---+-----+
pivotNumberDF = numberDF.groupBy("id").pivot("subject").sum("marks")
>>> pivotNumberDF.show()
+---+-------+----+-------+
| id|English|Math|Science|
+---+-------+----+-------+
| 1| 60| 80| 80|
+---+-------+----+-------+
>>> studentDF.join(pivotNumberDF, "id").show()
+---+-----+-------+----+-------+
| id| name|English|Math|Science|
+---+-----+-------+----+-------+
| 1|Vikas| 60| 80| 80|
+---+-----+-------+----+-------+
参考:http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html
df.createOrReplaceTempView("NumberDF")
df.createOrReplaceTempView("MasterDf")
val sqlDF = spark.sql("select m.id,t1.fld1,t2.fld2,t3.fld3,m.otherfields
from
(select id, (case when n.cat='pay' and n.sub_cat ='client' then n.sch_id end) fld1
from NumberDF n where case when n.cat='pay' and n.sub_cat ='client' then n.sch_id end is not null ) t1 ,
(select id, (case when n.cat='pay' and n.sub_cat ='phr' then n.sch_id end) fld2
from NumberDF n where case when n.cat='pay' and n.sub_cat ='phr' then n.sch_id end is not null ) t2,
(select id, (case when n.cat='credit' and n.sub_cat ='spGrp' then n.pay_id end) fld3
from NumberDF n where case when n.cat='credit' and n.sub_cat ='spGrp' then n.pay_id end is not null ) t3,
MasterDf m ")
sqlDF.show()
我终于用 Pivot 实现了它
flights.groupBy("ID", "CAT")
.pivot("SUB_CAT", Seq("client", "phr", "spGrp")).agg(avg("SCH_ID").as("SCH_ID"), avg("pay_id").as("pay_id"))
.groupBy("ID")
.pivot("CAT", Seq("credit", "price"))
.agg(
avg("client_SCH_ID").as("client_sch_id"), avg("client_pay_id").as("client_pay_id")
, avg("phr_SCH_ID").as("phr_SCH_ID"), avg("phr_pay_id").as("phr_pay_id")
, avg("spGrp_SCH_ID").as("spGrp_SCH_ID"), avg("spGrp_pay_id").as("spGrp_pay_id")
)
第一个枢轴将 Return table 喜欢
+---+------+-------------+--------------+-----------+------------+-------------+--------------+
| ID| CAT|client_SCH_ID|client_pay_id |phr_SCH_ID |phr_pay_id |spnGrp_SCH_ID|spnGrp_pay_id |
+---+------+-------------+--------------+-----------+------------+-------------+--------------+
| 1|credit| 5.0| 105.0| 4.0| 104.0| 6.0| 106.0|
| 1| pay | 2.0| 102.0| 1.0| 101.0| 3.0| 103.0|
+---+------+-------------+--------------+-----------+------------+-------------+--------------+
在第二个 Pivot 之后就像
+---+--------------------+---------------------+------------------+-------------------+--------------------+---------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+
| ID|credit_client_sch_id|credit_client_pay_id | credit_phr_SCH_ID| credit_phr_pay_id |credit_spnGrp_SCH_ID|credit_spnGrp_pay_id |pay_client_sch_id|pay_client_pay_id | pay_phr_SCH_ID| pay_phr_pay_id |pay_spnGrp_SCH_ID|pay_spnGrp_pay_id |
+---+--------------------+---------------------+------------------+-------------------+--------------------+---------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+
| 1| 5.0| 105.0| 4.0| 104.0| 6.0| 106.0| 2.0| 102.0| 1.0| 101.0| 3.0| 103.0|
+---+--------------------+---------------------+------------------+-------------------+--------------------+---------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+
虽然我不确定性能。