我如何通过不同级别的枢轴聚合然后在 pyspark 中进行内部连接?

How can I aggregate by different levels pivot then inner join in pyspark?

我是 spark 的新手,我有一个包含事务数据的数据框。我想按人员 ID 分组,但按不同的属性分组,例如店铺类型、教育程度。

%%spark
from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import functions as sf 

df = spark.sql("SELECT * FROM  df  limit 10")

shoptypes = df.select('shoptype').distinct().rdd.map(lambda r: r[0]).collect()

edulevel = df.select('edulevel').distinct().rdd.map(lambda r: r[0]).collect()

pivot_1 = df.groupBy("id").pivot("shoptype", shoptypes).sum("amount")
pivot_1 .show()


pivot_2 = df.groupBy("id").pivot("edulevel", edulevel).count()
pivot_2 .show()

alldfs = pivot_1.join(pivot_2, pivot_2.id == pivot_1.id, how='inner').drop(pivot_2.id)

我内部联接后alldfs 为空。当 pivot 1 和 pivot 2 具有相同的 ids 时,这怎么可能?

然而我觉得奇怪的是 pivot_1 和 pivot_2 不显示相同的人 ID - 我认为他们会因为它们是从相同的来源创建的?我不知道发生了什么。有人可以帮忙吗?我基本上想聚合不同的 attrubytes 和 pivot,然后水平加入 person id。所以我最终将行作为我的 ID,将列作为旋转属性。

Select 语句中的 Limit 子句不是确定性的。由于 Spark 是延迟计算的,SQL 语句将执行两次,您将在 pivot_1 和 pivot_2.

中获得不同的 id