Pyspark groupby 将 JSON 合并为单个对象
Pypsark groupby merge JSON into single object
我有这个数据框。
uid status count amount
14 success 1 50
14 failed 2 60
这是我想要的输出
uid result
14 {"successful_count": 1, "successful_amount": 50, "failure_count": 2,
"failure_amount": 60}
到目前为止我已经试过了。
schema = StructType([ \
StructField("successful_amount",DoubleType(),True), \
StructField("successful_count",IntegerType(),True), \
StructField("failure_amount",DoubleType(),True), \
StructField("failure_count",IntegerType(),True)
])
df_amount.withColumn("res", when(col("transaction_status") == "success", to_json(struct(col("amount").alias("successful_amount"), col("count").alias("successful_count"))))\
.when(col("transaction_status") == "failed", to_json(struct(col("amount").alias("failed_amount"), col("count").alias("failed_count")))))\
.withColumn("res", from_json(col("per_method"), schema))
这又添加了一个结构列 'res' 但现在我不确定如何分组并将两个 struct/JSON 组合成一个对象。我需要编写自定义聚合函数吗?
有什么easier/better方法可以得到输出吗?
选项 1:
此方法使用聚合函数将它们组合在一起(考虑它们的 uid),并使用 udf 函数获取您要查找的字典:
from pyspark.sql.functions import udf, collect_list
# First, group the pertinent columns depending on the uid
grouped_df = df_amount.groupby('uid').agg(collect_list('status').alias("name"), collect_list('count').alias("count"), collect_list('amount').alias("amount"))
# resulting on a pyspark dataframe with each possible value in the same row
grouped_df.show()
这是生成的 pyspark 数据帧:
+---+-----------------+------+--------+
|uid| name| count| amount|
+---+-----------------+------+--------+
| 14|[success, failed]|[1, 2]|[50, 60]|
+---+-----------------+------+--------+
之后,创建包含您要查找的词典的列:
# Create our customize function to create the dictionary using the values of each column (list)
@udf(StringType())
def new_column(name, count, amount):
nr = dict()
for i in range(len(name)):
nr[name[i] + "_amount"] = amount[i]
nr[name[i] + "_count"] = count[i]
return nr
result = grouped_df.withColumn("result", new_column(grouped_df['name'], grouped_df['count'], grouped_df['amount'])).select("uid", "result")
result.show(truncate = False)
结果数据框:
+---+----------------------------------------------------------------------+
|uid|result |
+---+----------------------------------------------------------------------+
|14 |{success_count=1, failed_count=2, success_amount=50, failed_amount=60}|
+---+----------------------------------------------------------------------+
选项 2:
您还可以创建一个数据框,连接具有相同 uid 和不同状态的行。之后,使用 res 列创建一个新的 pyspark 数据框,使其适应您正在寻找的模式:
# Filter the dataframe for the different values of the column status and join them together to get all the different values in the same row
# You can also rename the columns if necessary
df_amount_joined = df_amount.filter(col("status") == "success").withColumnRenamed("count", "successful_count").withColumnRenamed("amount", "successful_amount").join(df_amount.filter(col("status") == "failed").withColumnRenamed("count", "failure_count").withColumnRenamed("amount", "failure_amount"), on = "uid", how= "left").drop("status")
df_amount_joined.show()
获取结果数据帧:
+---+----------------+-----------------+-------------+--------------+
|uid|successful_count|successful_amount|failure_count|failure_amount|
+---+----------------+-----------------+-------------+--------------+
| 14| 1| 50| 2| 60|
+---+----------------+-----------------+-------------+--------------+
像您在示例中所做的那样,使用 struct 创建最后一列:
# Finally create the column as you did in your example
df_final = df_amount_joined.withColumn("res", to_json(struct(col("successful_count"), col("successful_amount"), col("failure_count"), col("failure_amount")))).select("uid", "res")
df_final.show(truncate = False)
这将为您留下您正在寻找的数据框:
+---+-----------------------------------------------------------------------------------+
|uid|res |
+---+-----------------------------------------------------------------------------------+
|14 |{"successful_count":1,"successful_amount":50,"failure_count":2,"failure_amount":60}|
+---+-----------------------------------------------------------------------------------+
我有这个数据框。
uid status count amount
14 success 1 50
14 failed 2 60
这是我想要的输出
uid result
14 {"successful_count": 1, "successful_amount": 50, "failure_count": 2,
"failure_amount": 60}
到目前为止我已经试过了。
schema = StructType([ \
StructField("successful_amount",DoubleType(),True), \
StructField("successful_count",IntegerType(),True), \
StructField("failure_amount",DoubleType(),True), \
StructField("failure_count",IntegerType(),True)
])
df_amount.withColumn("res", when(col("transaction_status") == "success", to_json(struct(col("amount").alias("successful_amount"), col("count").alias("successful_count"))))\
.when(col("transaction_status") == "failed", to_json(struct(col("amount").alias("failed_amount"), col("count").alias("failed_count")))))\
.withColumn("res", from_json(col("per_method"), schema))
这又添加了一个结构列 'res' 但现在我不确定如何分组并将两个 struct/JSON 组合成一个对象。我需要编写自定义聚合函数吗? 有什么easier/better方法可以得到输出吗?
选项 1:
此方法使用聚合函数将它们组合在一起(考虑它们的 uid),并使用 udf 函数获取您要查找的字典:
from pyspark.sql.functions import udf, collect_list
# First, group the pertinent columns depending on the uid
grouped_df = df_amount.groupby('uid').agg(collect_list('status').alias("name"), collect_list('count').alias("count"), collect_list('amount').alias("amount"))
# resulting on a pyspark dataframe with each possible value in the same row
grouped_df.show()
这是生成的 pyspark 数据帧:
+---+-----------------+------+--------+
|uid| name| count| amount|
+---+-----------------+------+--------+
| 14|[success, failed]|[1, 2]|[50, 60]|
+---+-----------------+------+--------+
之后,创建包含您要查找的词典的列:
# Create our customize function to create the dictionary using the values of each column (list)
@udf(StringType())
def new_column(name, count, amount):
nr = dict()
for i in range(len(name)):
nr[name[i] + "_amount"] = amount[i]
nr[name[i] + "_count"] = count[i]
return nr
result = grouped_df.withColumn("result", new_column(grouped_df['name'], grouped_df['count'], grouped_df['amount'])).select("uid", "result")
result.show(truncate = False)
结果数据框:
+---+----------------------------------------------------------------------+
|uid|result |
+---+----------------------------------------------------------------------+
|14 |{success_count=1, failed_count=2, success_amount=50, failed_amount=60}|
+---+----------------------------------------------------------------------+
选项 2:
您还可以创建一个数据框,连接具有相同 uid 和不同状态的行。之后,使用 res 列创建一个新的 pyspark 数据框,使其适应您正在寻找的模式:
# Filter the dataframe for the different values of the column status and join them together to get all the different values in the same row
# You can also rename the columns if necessary
df_amount_joined = df_amount.filter(col("status") == "success").withColumnRenamed("count", "successful_count").withColumnRenamed("amount", "successful_amount").join(df_amount.filter(col("status") == "failed").withColumnRenamed("count", "failure_count").withColumnRenamed("amount", "failure_amount"), on = "uid", how= "left").drop("status")
df_amount_joined.show()
获取结果数据帧:
+---+----------------+-----------------+-------------+--------------+
|uid|successful_count|successful_amount|failure_count|failure_amount|
+---+----------------+-----------------+-------------+--------------+
| 14| 1| 50| 2| 60|
+---+----------------+-----------------+-------------+--------------+
像您在示例中所做的那样,使用 struct 创建最后一列:
# Finally create the column as you did in your example
df_final = df_amount_joined.withColumn("res", to_json(struct(col("successful_count"), col("successful_amount"), col("failure_count"), col("failure_amount")))).select("uid", "res")
df_final.show(truncate = False)
这将为您留下您正在寻找的数据框:
+---+-----------------------------------------------------------------------------------+
|uid|res |
+---+-----------------------------------------------------------------------------------+
|14 |{"successful_count":1,"successful_amount":50,"failure_count":2,"failure_amount":60}|
+---+-----------------------------------------------------------------------------------+