特征工程继续之前的最后一个值出现

Feature Engineering continue previous last value occuerence

对于给定的数据帧

spark.createDataFrame([
  ("2019-06-24T07:29:22.000+0000", "Image Quality: 75"),
  ("2019-06-25T07:29:22.000+0000", "Start scan"),
  ("2019-06-26T07:29:22.000+0000", "Image Quality: 95"),
  ("2019-06-27T07:29:22.000+0000", "Start scan"),
  ("2019-06-28T07:29:22.000+0000", "Start scan")  
], ["ts", "message"])

我有兴趣设计一个图像质量特征,即组成以下数据帧。

+----------------------------+----------+-------------+
|ts                          |message   |image_quality|
+----------------------------+----------+-------------+
|2019-06-25T07:29:22.000+0000|Start scan|75           |
|2019-06-27T07:29:22.000+0000|Start scan|95           |
|2019-06-28T07:29:22.000+0000|Start scan|95           |
+----------------------------+----------+-------------+

我尝试了 windows 函数和子查询的各种组合,但似乎没有找到可行的解决方案。

IIUC,您想将上次可用的图像质量延续到下一次可用。

可以使用 Window:

尝试类似下面的操作

来自给定数据集的假设:对于任何日期,它总是以 Image Quality: <some value> 开头,然后是 Start scan

导入并准备数据集:

# Import Window
from pyspark.sql.window import Window
import pyspark.sql.functions as f

df.show(10, False)
+----------------------------+-----------------+
|ts                          |message          |
+----------------------------+-----------------+
|2019-06-24T07:29:22.000+0000|Image Quality: 75|
|2019-06-25T07:29:22.000+0000|Start scan       |
|2019-06-26T07:29:22.000+0000|Image Quality: 95|
|2019-06-27T07:29:22.000+0000|Start scan       |
|2019-06-28T07:29:22.000+0000|Start scan       |
+----------------------------+-----------------+

现在用 : 分隔符拆分 message 并创建 image_quality

df1 = df.withColumn('image_quality', f.split('message', ':')[1])
df1.show(10, False)
+----------------------------+-----------------+-------------+
|ts                          |message          |image_quality|
+----------------------------+-----------------+-------------+
|2019-06-24T07:29:22.000+0000|Image Quality: 75| 75          |
|2019-06-25T07:29:22.000+0000|Start scan       |null         |
|2019-06-26T07:29:22.000+0000|Image Quality: 95| 95          |
|2019-06-27T07:29:22.000+0000|Start scan       |null         |
|2019-06-28T07:29:22.000+0000|Start scan       |null         |
+----------------------------+-----------------+-------------+

orderBy ts 列定义 window

注意: 由于我们对解决方法感兴趣,因此没有添加任何 partitionBy 列,但如果可能,则始终添加 partitionBy

Spark 也会发出以下警告:

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

w_spec = Window.orderBy('ts')

最终数据集准备:

现在,应用 window 并使用 last('col', True) 找到最后可用的图像质量。这里 True 将忽略 null 值。

此外,过滤记录并删除 Not like Image Quality== Start scan

final_df = df1.withColumn('image_quality', f.coalesce('image_quality', f.last('image_quality', True).over(w_spec))) \
            .where(df1.message == 'Start scan')

final_df.show()

+--------------------+----------+-------------+
|                  ts|   message|image_quality|
+--------------------+----------+-------------+
|2019-06-25T07:29:...|Start scan|           75|
|2019-06-27T07:29:...|Start scan|           95|
|2019-06-28T07:29:...|Start scan|           95|
+--------------------+----------+-------------+