pyspark 数据透视函数无法创建列:无法在 (A, Out) 中解析列名 "In"

pyspark pivot function unable to create column: Cannot resolve column name "In" among (A, Out)

我有一个包含以下列的输入数据框 df:A、B、C。

| A | B  | C  |
| 1 | a1 | x1 |
| 1 | a1 | x2 |
| 1 | a2 | x3 |
| 1 | a3 | -  |
| 2 | a4 | x4 |
| 2 | a5 | x5 |
| 2 | a6 | x6 |
| 2 | a6 | x7 |
| 2 | a6 | x8 |
| 2 | a7 | -  |

我正在创建一个 pyspark 数据框,如下所示:

from pyspark.sql import functions as F
df1 = df.groupBy("A", "B")\
        .agg(F.countDistinct("C").alias("UniqueCount"))\
        .withColumn("InOrOut", F.when(F.col("UniqueCount") == 0,F.lit("Out"))\
                                      .otherwise(F.lit("In")))

df1 结果是:

| A  | B  | UniqueCount  | InOrOut |
| 1  | a1 | 2            | In      |
| 1  | a2 | 1            | In      |
| 1  | a3 | 0            | Out     |
| 2  | a4 | 1            | In      |
| 2  | a5 | 1            | In      |
| 2  | a6 | 3            | In      |
| 2  | a7 | 0            | Out     |

然后我在上面使用 pivot 如下:

    df2 = df1.groupBy("A")\
             .pivot("InOrOut")\
             .agg(*[F.countDistinct(F.col(x)).alias(x) for x in ["B"]])\
             .na.fill(value=0,subset=["In", "Out"])

我原以为 df2 是:

| A  |  In  | Out  |
| 1  |  2   | 1    |
| 2  |  3   | 1    |

相反,我遇到了以下错误 无法解析(A,Out)中的列名“In”。 我已经验证 df1 已创建并且数据的条件为 where UniqueCount == 0 as where UniqueCount <> 0

它按预期运行 -

Spark - v3.1.2

数据准备

s = StringIO("""
A,B,C
1,a1,x1 
1,a1,x2 
1,a2,x3 
1,a3,None
2,a4,x4 
2,a5,x5 
2,a6,x6 
2,a6,x7 
2,a6,x8 
2,a7,None
""")

df = pd.read_csv(s,delimiter=',')

sparkDF = sql.createDataFrame(df).withColumn('C',F.when(F.col('C') == 'None',F.lit(None)).otherwise(F.col('C')))

sparkDF.show()

+---+---+----+
|  A|  B|   C|
+---+---+----+
|  1| a1| x1 |
|  1| a1| x2 |
|  1| a2| x3 |
|  1| a3|null|
|  2| a4| x4 |
|  2| a5| x5 |
|  2| a6| x6 |
|  2| a6| x7 |
|  2| a6| x8 |
|  2| a7|null|
+---+---+----+

合计

sparkDF_agg = sparkDF.groupBy("A", "B")\
                    .agg(F.countDistinct("C").alias("UniqueCount"))\
                    .withColumn("InOrOut", F.when(F.col("UniqueCount") == 0,F.lit("Out"))\
                                                  .otherwise(F.lit("In")))


sparkDF_agg.show()

+---+---+-----------+-------+
|  A|  B|UniqueCount|InOrOut|
+---+---+-----------+-------+
|  1| a3|          0|    Out|
|  1| a1|          2|     In|
|  2| a5|          1|     In|
|  2| a6|          3|     In|
|  2| a4|          1|     In|
|  2| a7|          0|    Out|
|  1| a2|          1|     In|
+---+---+-----------+-------+

枢轴

sparkDF_agg.groupBy("A")\
             .pivot("InOrOut")\
             .agg(*[F.countDistinct(F.col(x)).alias(x) for x in ["B"]])\
             .na.fill(value=0,subset=["In", "Out"])\
             .show()

+---+---+---+
|  A| In|Out|
+---+---+---+
|  1|  2|  1|
|  2|  3|  1|
+---+---+---+