Pyspark:功能工程师时间点指标
Pyspark: Feature Engineer Point-in-Time Metrics
我尝试时间点数据建模的时间最长。为了举例说明这个过程,假设有 4 位不同的顾客(一些是回头客,一些是新顾客)从一家商店购物(如下所示)
我正在尝试使用 Pyspark 按 ID 对我的代码进行分区,并创建两个特征:均值(在那个时间点)购买和总购买。结果将显示如下:
我通过简单的累加求和确定了 Total_Purchase_1 列,但我无法在我的一生中得到计算平均值的时间点平均值 (Mean_Purchase_1)仅使用该日期的滞后值。
有没有人有过处理这类特征工程任务的经验?我是 R 大佬,有使用 dplyr 甚至 Pandas in Python 处理它的经验,但我正在尝试使用 Pyspark 对其进行优化。谢谢!
这应该是适合您的解决方案,请使用 window function
和 mean()
在此处创建 DF
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window as W
df = spark.createDataFrame([("B55","2018-05-28", 200),
("B55","2016-05-01", 300),
("B55","2015-02-10", 1000),
("A37","2017-12-30", 2100),
("A37","2016-06-21", 2000)
],[ "id","date","value"])
df.show()
_w = W.partitionBy("id").orderBy("date")
df = df.withColumn("mean", F.mean("value").over(_w))
df.show()
输入
+---+----------+-----+
| id| date|value|
+---+----------+-----+
|B55|2018-05-28| 200|
|B55|2016-05-01| 300|
|B55|2015-02-10| 1000|
|A37|2017-12-30| 2100|
|A37|2016-06-21| 2000|
+---+----------+-----+
输出
+---+----------+-----+------+
| id| date|value| mean|
+---+----------+-----+------+
|A37|2016-06-21| 2000|2000.0|
|A37|2017-12-30| 2100|2050.0|
|B55|2015-02-10| 1000|1000.0|
|B55|2016-05-01| 300| 650.0|
|B55|2018-05-28| 200| 500.0|
+---+----------+-----+------+
我尝试时间点数据建模的时间最长。为了举例说明这个过程,假设有 4 位不同的顾客(一些是回头客,一些是新顾客)从一家商店购物(如下所示)
我正在尝试使用 Pyspark 按 ID 对我的代码进行分区,并创建两个特征:均值(在那个时间点)购买和总购买。结果将显示如下:
我通过简单的累加求和确定了 Total_Purchase_1 列,但我无法在我的一生中得到计算平均值的时间点平均值 (Mean_Purchase_1)仅使用该日期的滞后值。
有没有人有过处理这类特征工程任务的经验?我是 R 大佬,有使用 dplyr 甚至 Pandas in Python 处理它的经验,但我正在尝试使用 Pyspark 对其进行优化。谢谢!
这应该是适合您的解决方案,请使用 window function
和 mean()
在此处创建 DF
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window as W
df = spark.createDataFrame([("B55","2018-05-28", 200),
("B55","2016-05-01", 300),
("B55","2015-02-10", 1000),
("A37","2017-12-30", 2100),
("A37","2016-06-21", 2000)
],[ "id","date","value"])
df.show()
_w = W.partitionBy("id").orderBy("date")
df = df.withColumn("mean", F.mean("value").over(_w))
df.show()
输入
+---+----------+-----+
| id| date|value|
+---+----------+-----+
|B55|2018-05-28| 200|
|B55|2016-05-01| 300|
|B55|2015-02-10| 1000|
|A37|2017-12-30| 2100|
|A37|2016-06-21| 2000|
+---+----------+-----+
输出
+---+----------+-----+------+
| id| date|value| mean|
+---+----------+-----+------+
|A37|2016-06-21| 2000|2000.0|
|A37|2017-12-30| 2100|2050.0|
|B55|2015-02-10| 1000|1000.0|
|B55|2016-05-01| 300| 650.0|
|B55|2018-05-28| 200| 500.0|
+---+----------+-----+------+