PySpark - 获取数据框中动态列的聚合值
PySpark - Get aggregated values for dynamic columns in a dataframe
我有一个包含以下行的数据框:
+------+--------+-------+-------+
| label| machine| value1| value2|
+------+--------+-------+-------+
|label1|machine1| 13| 7.5|
|label1|machine1| 9 | 7.5|
|label1|machine1| 8.5| 7.5|
|label1|machine1| 10.5| 7.5|
|label1|machine1| 12| 8|
|label1|machine2| 8 | 13.5|
|label1|machine2| 18| 10|
|label1|machine2| 10| 14|
|label1|machine2| 9 | 10.5|
|label1|machine2| 8.5| 10|
|label2|machine3| 8 | 7.5|
|label2|machine3| 18| 7.5|
|label2|machine3| 10| 7.5|
|label2|machine3| 9 | 7.5|
|label2|machine3| 8.5| 8|
|label2|machine4| 13.5| 13|
|label2|machine4| 10| 9|
|label2|machine4| 14| 8.5|
|label2|machine4| 10.5| 10.5|
|label2|machine4| 10| 12|
+------+--------+-------+-------+
在这里,我可以在数据框中包含 value1, value2
以外的多个值列。对于每一列,我想用 collect_list
聚合值并在数据框中创建一个新列,以便稍后执行一些功能。
为此,我试过这样:
my_df = my_df.groupBy(['label', 'machine']). \
agg(collect_list("value1").alias("col_value1"), collect_list("value2").alias("col_value2"))
它给了我以下 4 行,因为我按 label
和 machine
列分组。
+------+--------+--------------------+--------------------+
| label| machine| collected_value1| collected_value2|
+------+--------+--------------------+--------------------+
|label1|machine1|[13.0, 9.0, 8.5, ...|[7.5, 7.5, 7.5, 7...|
|label2|machine2|[8.0, 18.0, 10.0,...|[13.5, 10.0, 14, ...|
|label1|machine3|[8.0, 18.0, 10.0,...|[7.5, 7.5, 7.5, 7...|
|label2|machine4|[13.5, 10.0, 14, ...|[13.0, 9.0, 8.5, ...|
+------+--------+--------------------+--------------------+
现在,我的问题是如何将列动态传递给该组依据。每个运行的列可能不同,所以我想使用这样的东西:
df_cols = ['value1', 'value2']
my_df = my_df.groupBy(['label', 'machine']). \
agg(collect_list(col_name).alias(str(col_name+"_collected")) for col_name in df_cols)
它给我 AssertionError: all exprs should be Column
错误。
我怎样才能做到这一点?有人可以帮我解决这个问题吗?
提前致谢。
以下代码有效。谢谢。
exprs = [collect_list(x).alias(str(x+"_collected")) for x in df_cols]
my_df = my_df.groupBy(['label', 'machine']).agg(*exprs)
我有一个包含以下行的数据框:
+------+--------+-------+-------+
| label| machine| value1| value2|
+------+--------+-------+-------+
|label1|machine1| 13| 7.5|
|label1|machine1| 9 | 7.5|
|label1|machine1| 8.5| 7.5|
|label1|machine1| 10.5| 7.5|
|label1|machine1| 12| 8|
|label1|machine2| 8 | 13.5|
|label1|machine2| 18| 10|
|label1|machine2| 10| 14|
|label1|machine2| 9 | 10.5|
|label1|machine2| 8.5| 10|
|label2|machine3| 8 | 7.5|
|label2|machine3| 18| 7.5|
|label2|machine3| 10| 7.5|
|label2|machine3| 9 | 7.5|
|label2|machine3| 8.5| 8|
|label2|machine4| 13.5| 13|
|label2|machine4| 10| 9|
|label2|machine4| 14| 8.5|
|label2|machine4| 10.5| 10.5|
|label2|machine4| 10| 12|
+------+--------+-------+-------+
在这里,我可以在数据框中包含 value1, value2
以外的多个值列。对于每一列,我想用 collect_list
聚合值并在数据框中创建一个新列,以便稍后执行一些功能。
为此,我试过这样:
my_df = my_df.groupBy(['label', 'machine']). \
agg(collect_list("value1").alias("col_value1"), collect_list("value2").alias("col_value2"))
它给了我以下 4 行,因为我按 label
和 machine
列分组。
+------+--------+--------------------+--------------------+
| label| machine| collected_value1| collected_value2|
+------+--------+--------------------+--------------------+
|label1|machine1|[13.0, 9.0, 8.5, ...|[7.5, 7.5, 7.5, 7...|
|label2|machine2|[8.0, 18.0, 10.0,...|[13.5, 10.0, 14, ...|
|label1|machine3|[8.0, 18.0, 10.0,...|[7.5, 7.5, 7.5, 7...|
|label2|machine4|[13.5, 10.0, 14, ...|[13.0, 9.0, 8.5, ...|
+------+--------+--------------------+--------------------+
现在,我的问题是如何将列动态传递给该组依据。每个运行的列可能不同,所以我想使用这样的东西:
df_cols = ['value1', 'value2']
my_df = my_df.groupBy(['label', 'machine']). \
agg(collect_list(col_name).alias(str(col_name+"_collected")) for col_name in df_cols)
它给我 AssertionError: all exprs should be Column
错误。
我怎样才能做到这一点?有人可以帮我解决这个问题吗?
提前致谢。
以下代码有效。谢谢。
exprs = [collect_list(x).alias(str(x+"_collected")) for x in df_cols]
my_df = my_df.groupBy(['label', 'machine']).agg(*exprs)