如何循环遍历数据框并将数组添加到每一行
How to Loop through dataframe and add array to each row
我需要遍历 pyspark 数据框并在活跃月份数中爆破每一行。我主要关心的是当我尝试将数据放入配置单元时的内存管理和过程中消耗的时间。
我用 "idx", F.monotonically_increasing_id()
收集
但这已经破坏了我的代码的性能。
输入
Empid date_active date_end
1234 2012-01-01 2012-10-27
2345 2012-01-01 2012-12-31
3456 2012-01-01 2012-08-15
输出
EmpId effective_Month
1234 Jan-12
1234 Feb-12
1234 ....
1234 Oct-12
2345 Jan-12
2345 Feb-12
2345 ....
2345 Dec-12
通常,我会建议尝试使用udf。我曾经遇到过类似的问题,但我认为解决方案仍然太复杂。
相反,您可以改变对问题的看法。当您知道所有 EmpIds
中的 min(date_active)
和 max(date_end)
时,您可以遍历所有月份,例如'yyyy-mm-01'
并将其保存为数据框。
现在,您(广播)将生成的数据帧与 table 的每一行连接起来。最后你只需要一个简单的过滤器,如:when effective_month between date_active and date_end
。
最后你应该把effective_month
转换成你喜欢的字符串格式。
广播连接非常快,如果广播 table 很小,这样运行时应该不是问题。
您可以在数据框 API
中按以下方式解决此问题
创建示例数据框
df = spark.createDataFrame([["123","2012-01-01","2012-10-01"],['234', '2012-01-01', '2012-05-01'],["345","2012-01-01","2012-11-01"]], ("age","date_active", "date_end"))
+---+-----------+----------+
|age|date_active| date_end|
+---+-----------+----------+
|123| 2012-01-01|2012-10-01|
|234| 2012-01-01|2012-05-01|
|345| 2012-01-01|2012-11-01|
+---+-----------+----------+
将数据类型从字符串更改为时间戳
df = df.withColumn('date_active', df['date_active'].cast('timestamp'))\
.withColumn('date_end', df['date_end'].cast('timestamp'))
使用下面的代码添加月份列
from pyspark.sql import functions as f
df.withColumn('month_diff', f.months_between('date_end', 'date_active')).withColumn("repeat", f.expr("split(repeat(',', month_diff), ',')"))\
.select("*", f.posexplode("repeat").alias("date", "val")).withColumn("date", f.expr("add_months(date_active, date)"))\
.withColumn('month', f.date_format('date','MMM')).select(['age', 'date_active', 'date_end', 'month']).show()
---+-----------+----------+-----+
|age|date_active| date_end|month|
+---+-----------+----------+-----+
|123| 2012-01-01|2012-10-01| Jan|
|123| 2012-01-01|2012-10-01| Feb|
|123| 2012-01-01|2012-10-01| Mar|
|123| 2012-01-01|2012-10-01| Apr|
|123| 2012-01-01|2012-10-01| May|
|123| 2012-01-01|2012-10-01| Jun|
|123| 2012-01-01|2012-10-01| Jul|
|123| 2012-01-01|2012-10-01| Aug|
|123| 2012-01-01|2012-10-01| Sep|
|123| 2012-01-01|2012-10-01| Oct|
|234| 2012-01-01|2012-05-01| Jan|
|234| 2012-01-01|2012-05-01| Feb|
|234| 2012-01-01|2012-05-01| Mar|
|234| 2012-01-01|2012-05-01| Apr|
|234| 2012-01-01|2012-05-01| May|
|345| 2012-01-01|2012-11-01| Jan|
|345| 2012-01-01|2012-11-01| Feb|
|345| 2012-01-01|2012-11-01| Mar|
|345| 2012-01-01|2012-11-01| Apr|
|345| 2012-01-01|2012-11-01| May|
+---+-----------+----------+-----+
我需要遍历 pyspark 数据框并在活跃月份数中爆破每一行。我主要关心的是当我尝试将数据放入配置单元时的内存管理和过程中消耗的时间。
我用 "idx", F.monotonically_increasing_id()
收集
但这已经破坏了我的代码的性能。
输入
Empid date_active date_end
1234 2012-01-01 2012-10-27
2345 2012-01-01 2012-12-31
3456 2012-01-01 2012-08-15
输出
EmpId effective_Month
1234 Jan-12
1234 Feb-12
1234 ....
1234 Oct-12
2345 Jan-12
2345 Feb-12
2345 ....
2345 Dec-12
通常,我会建议尝试使用udf。我曾经遇到过类似的问题,但我认为解决方案仍然太复杂。
相反,您可以改变对问题的看法。当您知道所有 EmpIds
中的 min(date_active)
和 max(date_end)
时,您可以遍历所有月份,例如'yyyy-mm-01'
并将其保存为数据框。
现在,您(广播)将生成的数据帧与 table 的每一行连接起来。最后你只需要一个简单的过滤器,如:when effective_month between date_active and date_end
。
最后你应该把effective_month
转换成你喜欢的字符串格式。
广播连接非常快,如果广播 table 很小,这样运行时应该不是问题。
您可以在数据框 API
中按以下方式解决此问题创建示例数据框
df = spark.createDataFrame([["123","2012-01-01","2012-10-01"],['234', '2012-01-01', '2012-05-01'],["345","2012-01-01","2012-11-01"]], ("age","date_active", "date_end"))
+---+-----------+----------+
|age|date_active| date_end|
+---+-----------+----------+
|123| 2012-01-01|2012-10-01|
|234| 2012-01-01|2012-05-01|
|345| 2012-01-01|2012-11-01|
+---+-----------+----------+
将数据类型从字符串更改为时间戳
df = df.withColumn('date_active', df['date_active'].cast('timestamp'))\
.withColumn('date_end', df['date_end'].cast('timestamp'))
使用下面的代码添加月份列
from pyspark.sql import functions as f
df.withColumn('month_diff', f.months_between('date_end', 'date_active')).withColumn("repeat", f.expr("split(repeat(',', month_diff), ',')"))\
.select("*", f.posexplode("repeat").alias("date", "val")).withColumn("date", f.expr("add_months(date_active, date)"))\
.withColumn('month', f.date_format('date','MMM')).select(['age', 'date_active', 'date_end', 'month']).show()
---+-----------+----------+-----+
|age|date_active| date_end|month|
+---+-----------+----------+-----+
|123| 2012-01-01|2012-10-01| Jan|
|123| 2012-01-01|2012-10-01| Feb|
|123| 2012-01-01|2012-10-01| Mar|
|123| 2012-01-01|2012-10-01| Apr|
|123| 2012-01-01|2012-10-01| May|
|123| 2012-01-01|2012-10-01| Jun|
|123| 2012-01-01|2012-10-01| Jul|
|123| 2012-01-01|2012-10-01| Aug|
|123| 2012-01-01|2012-10-01| Sep|
|123| 2012-01-01|2012-10-01| Oct|
|234| 2012-01-01|2012-05-01| Jan|
|234| 2012-01-01|2012-05-01| Feb|
|234| 2012-01-01|2012-05-01| Mar|
|234| 2012-01-01|2012-05-01| Apr|
|234| 2012-01-01|2012-05-01| May|
|345| 2012-01-01|2012-11-01| Jan|
|345| 2012-01-01|2012-11-01| Feb|
|345| 2012-01-01|2012-11-01| Mar|
|345| 2012-01-01|2012-11-01| Apr|
|345| 2012-01-01|2012-11-01| May|
+---+-----------+----------+-----+