Pyspark groupby 将 JSON 合并为单个对象

Pypsark groupby merge JSON into single object

我有这个数据框。

uid status   count amount
14  success  1     50
14  failed   2     60

这是我想要的输出

uid   result
14    {"successful_count": 1, "successful_amount": 50, "failure_count": 2, 
      "failure_amount": 60}

到目前为止我已经试过了。

schema = StructType([ \
        StructField("successful_amount",DoubleType(),True), \
        StructField("successful_count",IntegerType(),True), \
        StructField("failure_amount",DoubleType(),True), \
        StructField("failure_count",IntegerType(),True)
      ])
    
df_amount.withColumn("res", when(col("transaction_status") == "success", to_json(struct(col("amount").alias("successful_amount"), col("count").alias("successful_count"))))\
                                  .when(col("transaction_status") == "failed", to_json(struct(col("amount").alias("failed_amount"), col("count").alias("failed_count")))))\
         .withColumn("res", from_json(col("per_method"), schema)) 

这又添加了一个结构列 'res' 但现在我不确定如何分组并将两个 struct/JSON 组合成一个对象。我需要编写自定义聚合函数吗? 有什么easier/better方法可以得到输出吗?

选项 1:

此方法使用聚合函数将它们组合在一起(考虑它们的 uid),并使用 udf 函数获取您要查找的字典:

from pyspark.sql.functions import udf, collect_list

# First, group the pertinent columns depending on the uid
grouped_df = df_amount.groupby('uid').agg(collect_list('status').alias("name"), collect_list('count').alias("count"), collect_list('amount').alias("amount"))
# resulting on a pyspark dataframe with each possible value in the same row
grouped_df.show()

这是生成的 pyspark 数据帧:

+---+-----------------+------+--------+
|uid|             name| count|  amount|
+---+-----------------+------+--------+
| 14|[success, failed]|[1, 2]|[50, 60]|
+---+-----------------+------+--------+

之后,创建包含您要查找的词典的列:

# Create our customize function to create the dictionary using the values of each column (list)
@udf(StringType())
def new_column(name, count, amount):
    nr = dict()
    for i in range(len(name)):        
        nr[name[i] + "_amount"] = amount[i]
        nr[name[i] + "_count"] = count[i]
    return nr

result = grouped_df.withColumn("result", new_column(grouped_df['name'], grouped_df['count'], grouped_df['amount'])).select("uid", "result")
result.show(truncate = False)

结果数据框:

+---+----------------------------------------------------------------------+
|uid|result                                                                |
+---+----------------------------------------------------------------------+
|14 |{success_count=1, failed_count=2, success_amount=50, failed_amount=60}|
+---+----------------------------------------------------------------------+

选项 2:

您还可以创建一个数据框,连接具有相同 uid 和不同状态的行。之后,使用 res 列创建一个新的 pyspark 数据框,使其适应您正在寻找的模式:

# Filter the dataframe for the different values of the column status and join them together to get all the different values in the same row
# You can also rename the columns if necessary
df_amount_joined = df_amount.filter(col("status") == "success").withColumnRenamed("count", "successful_count").withColumnRenamed("amount", "successful_amount").join(df_amount.filter(col("status") == "failed").withColumnRenamed("count", "failure_count").withColumnRenamed("amount", "failure_amount"), on = "uid", how= "left").drop("status")
df_amount_joined.show()

获取结果数据帧:

+---+----------------+-----------------+-------------+--------------+
|uid|successful_count|successful_amount|failure_count|failure_amount|
+---+----------------+-----------------+-------------+--------------+
| 14|               1|               50|            2|            60|
+---+----------------+-----------------+-------------+--------------+

像您在示例中所做的那样,使用 struct 创建最后一列:

# Finally create the column as you did in your example
df_final = df_amount_joined.withColumn("res", to_json(struct(col("successful_count"), col("successful_amount"), col("failure_count"), col("failure_amount")))).select("uid", "res")
df_final.show(truncate = False)

这将为您留下您正在寻找的数据框:

+---+-----------------------------------------------------------------------------------+
|uid|res                                                                                |
+---+-----------------------------------------------------------------------------------+
|14 |{"successful_count":1,"successful_amount":50,"failure_count":2,"failure_amount":60}|
+---+-----------------------------------------------------------------------------------+