创建一个列来累积数组pyspark中的数据
create a column to accumulate the data in an array psypark
我需要创建一个字段来累积数组中的数据。
我有以下数据框:
+----------------+-------------------+----------+---------------------+
|localSymbol_drop|end_window_drop |detect_DRM|last_detect_price_DRM|
+----------------+-------------------+----------+---------------------+
|BABA |2021-06-15 16:36:30|NO |NA |
|BABA |2021-06-15 16:37:00|NO |NA |
|BABA |2021-06-15 16:37:30|YES |211.85 |
|BABA |2021-06-15 16:38:00|NO |NA |
|BABA |2021-06-15 16:38:30|NO |NA |
|BABA |2021-06-15 16:40:30|NO |NA |
|BABA |2021-06-15 16:41:00|YES |211.91 |
|BABA |2021-06-15 16:42:00|NO |NA |
|BABA |2021-06-15 16:42:30|YES |211.83 |
+----------------+-------------------+----------+---------------------+
结果将是:
+----------------+-------------------+----------+---------------------+----------------------------------------+
|localSymbol_drop|end_window_drop |detect_DRM|last_detect_price_DRM|accum_array |
+----------------+-------------------+----------+---------------------+----------------------------------------+
|BABA |2021-06-15 16:36:30|NO |NA |[NA] |
|BABA |2021-06-15 16:37:00|NO |NA |[NA,NA] |
|BABA |2021-06-15 16:37:30|YES |211.85 |[NA,NA,211.85] |
|BABA |2021-06-15 16:38:00|NO |NA |[NA,NA,211.85,NA] |
|BABA |2021-06-15 16:38:30|NO |NA |[NA,NA,211.85,NA,NA] |
|BABA |2021-06-15 16:40:30|NO |NA |[NA,NA,211.85,NA,NA,NA] |
|BABA |2021-06-15 16:41:00|YES |211.91 |[NA,NA,211.85,NA,NA,NA,211.91] |
|BABA |2021-06-15 16:42:00|NO |NA |[NA,NA,211.85,NA,NA,NA,211.91,NA] |
|BABA |2021-06-15 16:42:30|YES |211.83 |[NA,NA,211.85,NA,NA,NA,211.91,NA,211.83]|
+----------------+-------------------+----------+---------------------+----------------------------------------+
有什么想法吗?谢谢!!
对于我的解决方案,您首先需要在数据框上创建索引:
1)
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window.orderBy("last_detect_price_DRM")
df = df.withColumn("index", row_number().over(w))
当您的数据框上有索引时,您需要从要累积的列中获取所有值并对该列表进行排序(以便它与您的数据框的顺序相同):
2)
my_list =
df.select(f.collect_list('last_detect_price_DRM')).first()[0]
my_list.sort()
现在您只需要创建一个 UserDefinedFunction,它将索引作为输入,returns 列表中的所有元素直到给定的索引。之后你只需要在你的数据帧上调用函数 withColumn('columnName', udf)
3)
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, ArrayType
def custom_func(index):
return my_list[0:index]
custom_func = udf(custom_func, ArrayType(StringType()))
df = df.withColumn('acc', custom_func(col('index')))
这将累积给定列中的所有值。
1.
+------+------+
| _1| _2|
+------+------+
| Java| 20000|
|Python|100000|
| Scala| 3000|
+------+------+
2.
+------+------+-----+
| _1| _2|index|
+------+------+-----+
|Python|100000| 1|
| Java| 20000| 2|
| Scala| 3000| 3|
+------+------+-----+
3.
+------+------+-----+--------------------+
| _1| _2|index| acc|
+------+------+-----+--------------------+
|Python|100000| 1| [100000]|
| Java| 20000| 2| [100000, 20000]|
| Scala| 3000| 3|[100000, 20000, 3...|
+------+------+-----+--------------------+
其他方案,不带udf功能
首先我计算一个指标:
win = Window.partitionBy('localSymbol_drop').orderBy('end_window_drop')
historical_trends = historical_trends.withColumn("index", f.row_number().over(win))
然后我得到有序列表
list_last_detect_price_DRM = historical_trends\
.orderBy('end_window_drop')\
.groupby('localSymbol_drop')\
.agg(
f.collect_list("last_detect_price_DRM").alias('list_last_detect_price_DRM')
).select(
f.col('list_last_detect_price_DRM'),
f.col('localSymbol_drop').alias('localSymbol_other_drop')
)
当我得到有序列表时,我所做的是使用切片函数连接到原始 df:
historical_trends = \
historical_trends \
.join(list_last_detect_price_DRM,
on=[
(historical_trends.localSymbol_drop == list_last_detect_price_DRM.localSymbol_other_drop)
],
how='left').distinct()\
.withColumn(
"correct_list_last_detect_price_DRM", f.expr("slice(list_last_detect_price_DRM,1,index)")
).drop('list_last_detect_price_DRM','localSymbol_other_drop')
结果是:
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------+
|localSymbol_drop|end_window_drop |detect_DRM|last_detect_price_DRM|index|correct_list_last_detect_price_DRM |
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------+
|BABA |2021-06-15 16:36:30|NO |NA |1 |[NA] |
|BABA |2021-06-15 16:37:00|NO |NA |2 |[NA, NA] |
|BABA |2021-06-15 16:37:30|YES |211.85 |3 |[NA, NA, 211.85] |
|BABA |2021-06-15 16:38:00|NO |NA |4 |[NA, NA, 211.85, NA] |
|BABA |2021-06-15 16:38:30|NO |NA |5 |[NA, NA, 211.85, NA, NA] |
|BABA |2021-06-15 16:39:00|NO |NA |6 |[NA, NA, 211.85, NA, NA, NA] |
|BABA |2021-06-15 16:39:30|NO |NA |7 |[NA, NA, 211.85, NA, NA, NA, NA] |
|BABA |2021-06-15 16:40:00|NO |NA |8 |[NA, NA, 211.85, NA, NA, NA, NA, NA] |
|BABA |2021-06-15 16:40:30|NO |NA |9 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA] |
|BABA |2021-06-15 16:41:00|YES |211.91 |10 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91] |
|BABA |2021-06-15 16:41:30|NO |NA |11 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA] |
|BABA |2021-06-15 16:42:00|NO |NA |12 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA] |
|BABA |2021-06-15 16:42:30|YES |211.83 |13 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83] |
|BABA |2021-06-15 16:43:00|NO |NA |14 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA] |
|BABA |2021-06-15 16:43:30|YES |211.75 |15 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75] |
|BABA |2021-06-15 16:44:00|NO |NA |16 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA] |
|BABA |2021-06-15 16:44:30|NO |NA |17 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA] |
|BABA |2021-06-15 16:45:00|NO |NA |18 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA] |
|BABA |2021-06-15 16:45:30|YES |211.72 |19 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72] |
|BABA |2021-06-15 16:46:00|NO |NA |20 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA] |
|BABA |2021-06-15 16:46:30|NO |NA |21 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA] |
|BABA |2021-06-15 16:47:00|NO |NA |22 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA] |
|BABA |2021-06-15 16:47:30|YES |211.81 |23 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81] |
|BABA |2021-06-15 16:48:00|NO |NA |24 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA] |
|BABA |2021-06-15 16:48:30|NO |NA |25 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA] |
|BABA |2021-06-15 16:49:00|YES |211.93 |26 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA, 211.93] |
|BABA |2021-06-15 16:49:30|NO |NA |27 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA, 211.93, NA] |
|BABA |2021-06-15 16:50:00|NO |NA |28 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA, 211.93, NA, NA]|
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------+
我需要创建一个字段来累积数组中的数据。 我有以下数据框:
+----------------+-------------------+----------+---------------------+
|localSymbol_drop|end_window_drop |detect_DRM|last_detect_price_DRM|
+----------------+-------------------+----------+---------------------+
|BABA |2021-06-15 16:36:30|NO |NA |
|BABA |2021-06-15 16:37:00|NO |NA |
|BABA |2021-06-15 16:37:30|YES |211.85 |
|BABA |2021-06-15 16:38:00|NO |NA |
|BABA |2021-06-15 16:38:30|NO |NA |
|BABA |2021-06-15 16:40:30|NO |NA |
|BABA |2021-06-15 16:41:00|YES |211.91 |
|BABA |2021-06-15 16:42:00|NO |NA |
|BABA |2021-06-15 16:42:30|YES |211.83 |
+----------------+-------------------+----------+---------------------+
结果将是:
+----------------+-------------------+----------+---------------------+----------------------------------------+
|localSymbol_drop|end_window_drop |detect_DRM|last_detect_price_DRM|accum_array |
+----------------+-------------------+----------+---------------------+----------------------------------------+
|BABA |2021-06-15 16:36:30|NO |NA |[NA] |
|BABA |2021-06-15 16:37:00|NO |NA |[NA,NA] |
|BABA |2021-06-15 16:37:30|YES |211.85 |[NA,NA,211.85] |
|BABA |2021-06-15 16:38:00|NO |NA |[NA,NA,211.85,NA] |
|BABA |2021-06-15 16:38:30|NO |NA |[NA,NA,211.85,NA,NA] |
|BABA |2021-06-15 16:40:30|NO |NA |[NA,NA,211.85,NA,NA,NA] |
|BABA |2021-06-15 16:41:00|YES |211.91 |[NA,NA,211.85,NA,NA,NA,211.91] |
|BABA |2021-06-15 16:42:00|NO |NA |[NA,NA,211.85,NA,NA,NA,211.91,NA] |
|BABA |2021-06-15 16:42:30|YES |211.83 |[NA,NA,211.85,NA,NA,NA,211.91,NA,211.83]|
+----------------+-------------------+----------+---------------------+----------------------------------------+
有什么想法吗?谢谢!!
对于我的解决方案,您首先需要在数据框上创建索引:
1)
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window.orderBy("last_detect_price_DRM")
df = df.withColumn("index", row_number().over(w))
当您的数据框上有索引时,您需要从要累积的列中获取所有值并对该列表进行排序(以便它与您的数据框的顺序相同):
2)
my_list =
df.select(f.collect_list('last_detect_price_DRM')).first()[0]
my_list.sort()
现在您只需要创建一个 UserDefinedFunction,它将索引作为输入,returns 列表中的所有元素直到给定的索引。之后你只需要在你的数据帧上调用函数 withColumn('columnName', udf)
3)
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, ArrayType
def custom_func(index):
return my_list[0:index]
custom_func = udf(custom_func, ArrayType(StringType()))
df = df.withColumn('acc', custom_func(col('index')))
这将累积给定列中的所有值。
1.
+------+------+
| _1| _2|
+------+------+
| Java| 20000|
|Python|100000|
| Scala| 3000|
+------+------+
2.
+------+------+-----+
| _1| _2|index|
+------+------+-----+
|Python|100000| 1|
| Java| 20000| 2|
| Scala| 3000| 3|
+------+------+-----+
3.
+------+------+-----+--------------------+
| _1| _2|index| acc|
+------+------+-----+--------------------+
|Python|100000| 1| [100000]|
| Java| 20000| 2| [100000, 20000]|
| Scala| 3000| 3|[100000, 20000, 3...|
+------+------+-----+--------------------+
其他方案,不带udf功能
首先我计算一个指标:
win = Window.partitionBy('localSymbol_drop').orderBy('end_window_drop')
historical_trends = historical_trends.withColumn("index", f.row_number().over(win))
然后我得到有序列表
list_last_detect_price_DRM = historical_trends\
.orderBy('end_window_drop')\
.groupby('localSymbol_drop')\
.agg(
f.collect_list("last_detect_price_DRM").alias('list_last_detect_price_DRM')
).select(
f.col('list_last_detect_price_DRM'),
f.col('localSymbol_drop').alias('localSymbol_other_drop')
)
当我得到有序列表时,我所做的是使用切片函数连接到原始 df:
historical_trends = \
historical_trends \
.join(list_last_detect_price_DRM,
on=[
(historical_trends.localSymbol_drop == list_last_detect_price_DRM.localSymbol_other_drop)
],
how='left').distinct()\
.withColumn(
"correct_list_last_detect_price_DRM", f.expr("slice(list_last_detect_price_DRM,1,index)")
).drop('list_last_detect_price_DRM','localSymbol_other_drop')
结果是:
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------+
|localSymbol_drop|end_window_drop |detect_DRM|last_detect_price_DRM|index|correct_list_last_detect_price_DRM |
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------+
|BABA |2021-06-15 16:36:30|NO |NA |1 |[NA] |
|BABA |2021-06-15 16:37:00|NO |NA |2 |[NA, NA] |
|BABA |2021-06-15 16:37:30|YES |211.85 |3 |[NA, NA, 211.85] |
|BABA |2021-06-15 16:38:00|NO |NA |4 |[NA, NA, 211.85, NA] |
|BABA |2021-06-15 16:38:30|NO |NA |5 |[NA, NA, 211.85, NA, NA] |
|BABA |2021-06-15 16:39:00|NO |NA |6 |[NA, NA, 211.85, NA, NA, NA] |
|BABA |2021-06-15 16:39:30|NO |NA |7 |[NA, NA, 211.85, NA, NA, NA, NA] |
|BABA |2021-06-15 16:40:00|NO |NA |8 |[NA, NA, 211.85, NA, NA, NA, NA, NA] |
|BABA |2021-06-15 16:40:30|NO |NA |9 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA] |
|BABA |2021-06-15 16:41:00|YES |211.91 |10 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91] |
|BABA |2021-06-15 16:41:30|NO |NA |11 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA] |
|BABA |2021-06-15 16:42:00|NO |NA |12 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA] |
|BABA |2021-06-15 16:42:30|YES |211.83 |13 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83] |
|BABA |2021-06-15 16:43:00|NO |NA |14 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA] |
|BABA |2021-06-15 16:43:30|YES |211.75 |15 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75] |
|BABA |2021-06-15 16:44:00|NO |NA |16 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA] |
|BABA |2021-06-15 16:44:30|NO |NA |17 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA] |
|BABA |2021-06-15 16:45:00|NO |NA |18 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA] |
|BABA |2021-06-15 16:45:30|YES |211.72 |19 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72] |
|BABA |2021-06-15 16:46:00|NO |NA |20 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA] |
|BABA |2021-06-15 16:46:30|NO |NA |21 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA] |
|BABA |2021-06-15 16:47:00|NO |NA |22 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA] |
|BABA |2021-06-15 16:47:30|YES |211.81 |23 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81] |
|BABA |2021-06-15 16:48:00|NO |NA |24 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA] |
|BABA |2021-06-15 16:48:30|NO |NA |25 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA] |
|BABA |2021-06-15 16:49:00|YES |211.93 |26 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA, 211.93] |
|BABA |2021-06-15 16:49:30|NO |NA |27 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA, 211.93, NA] |
|BABA |2021-06-15 16:50:00|NO |NA |28 |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA, 211.93, NA, NA]|
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------+