Pyspark 合并数据帧后的列总和

Pyspark sum of columns after union of dataframe

合并两个数据框后如何对所有列求和?

我有第一个 df,每个用户一行:

df = sqlContext.createDataFrame([("2022-01-10", 3, 2,"a"),("2022-01-10",3,4,"b"),("2022-01-10", 1,3,"c")], ["date", "value1", "value2", "userid"])
df.show()

+----------+------+------+------+
|      date|value1|value2|userid|
+----------+------+------+------+
|2022-01-10|     3|     2|     a|
|2022-01-10|     3|     4|     b|
|2022-01-10|     1|     3|     c|
+----------+------+------+------+

日期值始终是今天的日期。

我还有另一个 df,这次每个用户 ID 有多行,所以每天一个值:

df2 = sqlContext.createDataFrame([("2022-01-01", 13, 12,"a"),("2022-01-02",13,14,"b"),("2022-01-03", 11,13,"c"),
                                 ("2022-01-04", 3, 2,"a"),("2022-01-05",3,4,"b"),("2022-01-06", 1,3,"c"),
                                 ("2022-01-10", 31, 21,"a"),("2022-01-07",31,41,"b"),("2022-01-09", 11,31,"c")], ["date", "value3", "value4", "userid"])
df2.show()

+----------+------+------+------+
|      date|value3|value4|userid|
+----------+------+------+------+
|2022-01-01|    13|    12|     a|
|2022-01-02|    13|    14|     b|
|2022-01-03|    11|    13|     c|
|2022-01-04|     3|     2|     a|
|2022-01-05|     3|     4|     b|
|2022-01-06|     1|     3|     c|
|2022-01-10|    31|    21|     a|
|2022-01-07|    31|    41|     b|
|2022-01-09|    11|    31|     c|
+----------+------+------+------+

将他们两个与这个函数合并后,我得到的是:

def union_different_tables(df1, df2):
    columns_df1 = df1.columns
    columns_df2 = df2.columns
    data_types_df1 = [i.dataType for i in df1.schema.fields]
    data_types_df2 = [i.dataType for i in df2.schema.fields]
    
    for col, _type in zip(columns_df1, data_types_df1):
        if col not in df2.columns:
            df2 = df2.withColumn(col, f.lit(None).cast(_type))
    for col, _type in zip(columns_df2, data_types_df2):
        if col not in df1.columns:
            df1 = df1.withColumn(col, f.lit(None).cast(_type))
    union = df1.unionByName(df2)
    return union

+----------+------+------+------+------+------+
|      date|value1|value2|userid|value3|value4|
+----------+------+------+------+------+------+
|2022-01-10|     3|     2|     a|  null|  null|
|2022-01-10|     3|     4|     b|  null|  null|
|2022-01-10|     1|     3|     c|  null|  null|
|2022-01-01|  null|  null|     a|    13|    12|
|2022-01-02|  null|  null|     b|    13|    14|
|2022-01-03|  null|  null|     c|    11|    13|
|2022-01-04|  null|  null|     a|     3|     2|
|2022-01-05|  null|  null|     b|     3|     4|
|2022-01-06|  null|  null|     c|     1|     3|
|2022-01-10|  null|  null|     a|    31|    21|
|2022-01-07|  null|  null|     b|    31|    41|
|2022-01-09|  null|  null|     c|    11|    31|
+----------+------+------+------+------+------+

我想要得到的是 df2 中所有列的总和(在实际情况下我有 10 列)直到每一天的日期 userid,所以一行每个用户:

 +----------+------+------+------+------+------+
|      date|value1|value2|userid|value3|value4|
+----------+------+------+------+------+------+
|2022-01-10|     3|     2|     a|  47  |  35  |
|2022-01-10|     3|     4|     b|  47  |  59  |
|2022-01-10|     1|     3|     c|  23  |  47  |
+----------+------+------+------+------+------+

由于我必须对多个表执行此操作,这里我尝试了:

user_window = Window.partitionBy(['userid']).orderBy('date')
    
list_tables = [df2]

list_col_original = df.columns

for table in list_tables:
    df = union_different_tables(df, table)
    list_column = list(set(table.columns) - set(list_col_original))

    list_col_original.extend(list_column)

    df = df.select('userid',
            *[f.sum(f.col(col_name)).over(user_window).alias(col_name) for col_name in list_column])
df.show()

+------+------+------+
|userid|value4|value3|
+------+------+------+
|     c|    13|    11|
|     c|    16|    12|
|     c|    47|    23|
|     c|    47|    23|
|     b|    14|    13|
|     b|    18|    16|
|     b|    59|    47|
|     b|    59|    47|
|     a|    12|    13|
|     a|    14|    16|
|     a|    35|    47|
|     a|    35|    47|
+------+------+------+

但这给了我一种累积总和,而且我没有找到将所有列添加到结果 df 中的方法。

唯一的问题是我无法加入!我的 df 非常非常大,任何连接的计算时间都太长了。

你知道我如何修改我的代码以获得我想要的结果吗?

合并 df1df2 后,您可以按 userid 分组并对除 date 之外的所有列求和,得到最大值。

请注意,对于并集部分,如果您具有相同的数据类型但只有列数可以不同,您实际上可以使用 DataFrame.unionByName

df = df1.unionByName(df2, allowMissingColumns=True)

然后分组并聚合:

import pyspark.sql.functions as F

result = df.groupBy("userid").agg(
    F.max("date").alias("date"),
    *[F.sum(c).alias(c) for c in df.columns if c not in ("date", "userid")]
)

result.show()

#+------+----------+------+------+------+------+
#|userid|      date|value1|value2|value3|value4|
#+------+----------+------+------+------+------+
#|     a|2022-01-10|     3|     2|    47|    35|
#|     b|2022-01-10|     3|     4|    47|    59|
#|     c|2022-01-10|     1|     3|    23|    47|
#+------+----------+------+------+------+------+

这假设第二个数据框只包含第一个数据框中今天日期之前的日期。否则,您需要在联合之前过滤 df2