Pyspark:在 groupby 中创建一组值的新列

Pyspark: Create new column of the set of values in a groupby

我有一个像这样的 pyspark 数据框:

df = pd.DataFrame({"Date": ["2020-05-10", "2020-05-10", "2020-05-10", "2020-05-11", "2020-05-11", "2020-05-12", ], "Mode": ['A', 'B', 'A', 'C', 'C', 'B']})

df = spark.createDataFrame(df)

+----------+----+
|      Date|Mode|
+----------+----+
|2020-05-10|   A|
|2020-05-10|   B|
|2020-05-10|   A|
|2020-05-11|   C|
|2020-05-11|   C|
|2020-05-12|   B|
+----------+----+

我想对 Date 进行分组并在 Mode 列中创建一组值的新列,如下所示:

df = pd.DataFrame({"Date": ["2020-05-10", "2020-05-10", "2020-05-10", "2020-05-11", "2020-05-11", "2020-05-12", ], "Mode": ['A', 'B', 'A', 'C', 'C', 'B'], "set(Mode)": [['A', 'B'], ['A', 'B'], ['A', 'B'], ['C'], ['C'], ['B']]})

df = spark.createDataFrame(df)

+----------+----+---------+
|      Date|Mode|set(Mode)|
+----------+----+---------+
|2020-05-10|   A|   [A, B]|
|2020-05-10|   B|   [A, B]|
|2020-05-10|   A|   [A, B]|
|2020-05-11|   C|      [C]|
|2020-05-11|   C|      [C]|
|2020-05-12|   B|      [B]|
+----------+----+---------+

您可以尝试 collect_set 而不是 window:

import pyspark.sql.functions as F

df.withColumn("Set",F.collect_set('Mode')
                     .over(Window.partitionBy("Date"))).orderBy("Date").show()

+----------+----+------+
|      Date|Mode|   Set|
+----------+----+------+
|2020-05-10|   A|[B, A]|
|2020-05-10|   A|[B, A]|
|2020-05-10|   B|[B, A]|
|2020-05-11|   C|   [C]|
|2020-05-11|   C|   [C]|
|2020-05-12|   B|   [B]|
+----------+----+------+

如果确切的顺序很重要:

(df.withColumn("idx",F.monotonically_increasing_id())
   .withColumn("Set",F.collect_set('Mode').over(Window.partitionBy("Date")))
   .orderBy("idx").drop("idx")).show()

+----------+----+------+
|      Date|Mode|   Set|
+----------+----+------+
|2020-05-10|   A|[B, A]|
|2020-05-10|   B|[B, A]|
|2020-05-10|   A|[B, A]|
|2020-05-11|   C|   [C]|
|2020-05-11|   C|   [C]|
|2020-05-12|   B|   [B]|
+----------+----+------+

您可以试试下面的代码

# Import Libraries
import pandas as pd

# Create DataFrame
df = pd.DataFrame({"Date": ["2020-05-10", "2020-05-10", "2020-05-10", "2020-05-11", "2020-05-11", "2020-05-12", ], "Mode": ['A', 'B', 'A', 'C', 'C', 'B']})
df = spark.createDataFrame(df)

# Group By on Date anc collect the values as set using collect_set function.
df1 = df.groupBy("Date").agg(collect_set("Mode"))

# Join the DataFrames to get desired result.
df2 = df.join(df1, "Date")

# Display DataFrame
df2.show()

输出

+----------+----+-----------------+
|      Date|Mode|collect_set(Mode)|
+----------+----+-----------------+
|2020-05-11|   C|              [C]|
|2020-05-11|   C|              [C]|
|2020-05-10|   A|           [B, A]|
|2020-05-10|   B|           [B, A]|
|2020-05-10|   A|           [B, A]|
|2020-05-12|   B|              [B]|
+----------+----+-----------------+

希望对您有所帮助。