创建一个列来累积数组pyspark中的数据

create a column to accumulate the data in an array psypark

我需要创建一个字段来累积数组中的数据。 我有以下数据框:

+----------------+-------------------+----------+---------------------+
|localSymbol_drop|end_window_drop    |detect_DRM|last_detect_price_DRM|
+----------------+-------------------+----------+---------------------+
|BABA            |2021-06-15 16:36:30|NO        |NA                   |
|BABA            |2021-06-15 16:37:00|NO        |NA                   |
|BABA            |2021-06-15 16:37:30|YES       |211.85               |
|BABA            |2021-06-15 16:38:00|NO        |NA                   |
|BABA            |2021-06-15 16:38:30|NO        |NA                   |
|BABA            |2021-06-15 16:40:30|NO        |NA                   |
|BABA            |2021-06-15 16:41:00|YES       |211.91               |
|BABA            |2021-06-15 16:42:00|NO        |NA                   |
|BABA            |2021-06-15 16:42:30|YES       |211.83               |
+----------------+-------------------+----------+---------------------+

结果将是:

+----------------+-------------------+----------+---------------------+----------------------------------------+
|localSymbol_drop|end_window_drop    |detect_DRM|last_detect_price_DRM|accum_array                             |
+----------------+-------------------+----------+---------------------+----------------------------------------+
|BABA            |2021-06-15 16:36:30|NO        |NA                   |[NA]                                    |
|BABA            |2021-06-15 16:37:00|NO        |NA                   |[NA,NA]                                 |
|BABA            |2021-06-15 16:37:30|YES       |211.85               |[NA,NA,211.85]                          |
|BABA            |2021-06-15 16:38:00|NO        |NA                   |[NA,NA,211.85,NA]                       |
|BABA            |2021-06-15 16:38:30|NO        |NA                   |[NA,NA,211.85,NA,NA]                    |
|BABA            |2021-06-15 16:40:30|NO        |NA                   |[NA,NA,211.85,NA,NA,NA]                 |
|BABA            |2021-06-15 16:41:00|YES       |211.91               |[NA,NA,211.85,NA,NA,NA,211.91]          |
|BABA            |2021-06-15 16:42:00|NO        |NA                   |[NA,NA,211.85,NA,NA,NA,211.91,NA]       |
|BABA            |2021-06-15 16:42:30|YES       |211.83               |[NA,NA,211.85,NA,NA,NA,211.91,NA,211.83]|
+----------------+-------------------+----------+---------------------+----------------------------------------+

有什么想法吗?谢谢!!

对于我的解决方案,您首先需要在数据框上创建索引:

1)
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window.orderBy("last_detect_price_DRM") 
df = df.withColumn("index", row_number().over(w))

当您的数据框上有索引时,您需要从要累积的列中获取所有值并对该列表进行排序(以便它与您的数据框的顺序相同):

2)
my_list = 
df.select(f.collect_list('last_detect_price_DRM')).first()[0]
my_list.sort()

现在您只需要创建一个 UserDefinedFunction,它将索引作为输入,returns 列表中的所有元素直到给定的索引。之后你只需要在你的数据帧上调用函数 withColumn('columnName', udf)

3)
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, ArrayType

def custom_func(index):
    return my_list[0:index]


custom_func = udf(custom_func, ArrayType(StringType()))

df = df.withColumn('acc', custom_func(col('index')))

这将累积给定列中的所有值。

1.
+------+------+
|    _1|    _2|
+------+------+
|  Java| 20000|
|Python|100000|
| Scala|  3000|
+------+------+

2.
 +------+------+-----+
|    _1|    _2|index|
+------+------+-----+
|Python|100000|    1|
|  Java| 20000|    2|
| Scala|  3000|    3|
+------+------+-----+

3.
+------+------+-----+--------------------+
|    _1|    _2|index|                 acc|
+------+------+-----+--------------------+
|Python|100000|    1|            [100000]|
|  Java| 20000|    2|     [100000, 20000]|
| Scala|  3000|    3|[100000, 20000, 3...|
+------+------+-----+--------------------+

其他方案,不带udf功能

首先我计算一个指标:

win = Window.partitionBy('localSymbol_drop').orderBy('end_window_drop')
historical_trends = historical_trends.withColumn("index", f.row_number().over(win))

然后我得到有序列表

list_last_detect_price_DRM = historical_trends\
    .orderBy('end_window_drop')\
    .groupby('localSymbol_drop')\
    .agg(
        f.collect_list("last_detect_price_DRM").alias('list_last_detect_price_DRM')
    ).select(
        f.col('list_last_detect_price_DRM'),
        f.col('localSymbol_drop').alias('localSymbol_other_drop')
    )

当我得到有序列表时,我所做的是使用切片函数连接到原始 df:

historical_trends = \
    historical_trends            \
    .join(list_last_detect_price_DRM,
        on=[
            (historical_trends.localSymbol_drop == list_last_detect_price_DRM.localSymbol_other_drop)
        ],
        how='left').distinct()\
    .withColumn(
        "correct_list_last_detect_price_DRM", f.expr("slice(list_last_detect_price_DRM,1,index)")
    ).drop('list_last_detect_price_DRM','localSymbol_other_drop')

结果是:

+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------+
|localSymbol_drop|end_window_drop    |detect_DRM|last_detect_price_DRM|index|correct_list_last_detect_price_DRM                                                                                                          |
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------+
|BABA            |2021-06-15 16:36:30|NO        |NA                   |1    |[NA]                                                                                                                                        |
|BABA            |2021-06-15 16:37:00|NO        |NA                   |2    |[NA, NA]                                                                                                                                    |
|BABA            |2021-06-15 16:37:30|YES       |211.85               |3    |[NA, NA, 211.85]                                                                                                                            |
|BABA            |2021-06-15 16:38:00|NO        |NA                   |4    |[NA, NA, 211.85, NA]                                                                                                                        |
|BABA            |2021-06-15 16:38:30|NO        |NA                   |5    |[NA, NA, 211.85, NA, NA]                                                                                                                    |
|BABA            |2021-06-15 16:39:00|NO        |NA                   |6    |[NA, NA, 211.85, NA, NA, NA]                                                                                                                |
|BABA            |2021-06-15 16:39:30|NO        |NA                   |7    |[NA, NA, 211.85, NA, NA, NA, NA]                                                                                                            |
|BABA            |2021-06-15 16:40:00|NO        |NA                   |8    |[NA, NA, 211.85, NA, NA, NA, NA, NA]                                                                                                        |
|BABA            |2021-06-15 16:40:30|NO        |NA                   |9    |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA]                                                                                                    |
|BABA            |2021-06-15 16:41:00|YES       |211.91               |10   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91]                                                                                            |
|BABA            |2021-06-15 16:41:30|NO        |NA                   |11   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA]                                                                                        |
|BABA            |2021-06-15 16:42:00|NO        |NA                   |12   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA]                                                                                    |
|BABA            |2021-06-15 16:42:30|YES       |211.83               |13   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83]                                                                            |
|BABA            |2021-06-15 16:43:00|NO        |NA                   |14   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA]                                                                        |
|BABA            |2021-06-15 16:43:30|YES       |211.75               |15   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75]                                                                |
|BABA            |2021-06-15 16:44:00|NO        |NA                   |16   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA]                                                            |
|BABA            |2021-06-15 16:44:30|NO        |NA                   |17   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA]                                                        |
|BABA            |2021-06-15 16:45:00|NO        |NA                   |18   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA]                                                    |
|BABA            |2021-06-15 16:45:30|YES       |211.72               |19   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72]                                            |
|BABA            |2021-06-15 16:46:00|NO        |NA                   |20   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA]                                        |
|BABA            |2021-06-15 16:46:30|NO        |NA                   |21   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA]                                    |
|BABA            |2021-06-15 16:47:00|NO        |NA                   |22   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA]                                |
|BABA            |2021-06-15 16:47:30|YES       |211.81               |23   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81]                        |
|BABA            |2021-06-15 16:48:00|NO        |NA                   |24   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA]                    |
|BABA            |2021-06-15 16:48:30|NO        |NA                   |25   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA]                |
|BABA            |2021-06-15 16:49:00|YES       |211.93               |26   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA, 211.93]        |
|BABA            |2021-06-15 16:49:30|NO        |NA                   |27   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA, 211.93, NA]    |
|BABA            |2021-06-15 16:50:00|NO        |NA                   |28   |[NA, NA, 211.85, NA, NA, NA, NA, NA, NA, 211.91, NA, NA, 211.83, NA, 211.75, NA, NA, NA, 211.72, NA, NA, NA, 211.81, NA, NA, 211.93, NA, NA]|
+----------------+-------------------+----------+---------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------+