特征工程继续之前的最后一个值出现
Feature Engineering continue previous last value occuerence
对于给定的数据帧
spark.createDataFrame([
("2019-06-24T07:29:22.000+0000", "Image Quality: 75"),
("2019-06-25T07:29:22.000+0000", "Start scan"),
("2019-06-26T07:29:22.000+0000", "Image Quality: 95"),
("2019-06-27T07:29:22.000+0000", "Start scan"),
("2019-06-28T07:29:22.000+0000", "Start scan")
], ["ts", "message"])
我有兴趣设计一个图像质量特征,即组成以下数据帧。
+----------------------------+----------+-------------+
|ts |message |image_quality|
+----------------------------+----------+-------------+
|2019-06-25T07:29:22.000+0000|Start scan|75 |
|2019-06-27T07:29:22.000+0000|Start scan|95 |
|2019-06-28T07:29:22.000+0000|Start scan|95 |
+----------------------------+----------+-------------+
我尝试了 windows 函数和子查询的各种组合,但似乎没有找到可行的解决方案。
IIUC,您想将上次可用的图像质量延续到下一次可用。
可以使用 Window
:
尝试类似下面的操作
来自给定数据集的假设:对于任何日期,它总是以 Image Quality: <some value>
开头,然后是 Start scan
导入并准备数据集:
# Import Window
from pyspark.sql.window import Window
import pyspark.sql.functions as f
df.show(10, False)
+----------------------------+-----------------+
|ts |message |
+----------------------------+-----------------+
|2019-06-24T07:29:22.000+0000|Image Quality: 75|
|2019-06-25T07:29:22.000+0000|Start scan |
|2019-06-26T07:29:22.000+0000|Image Quality: 95|
|2019-06-27T07:29:22.000+0000|Start scan |
|2019-06-28T07:29:22.000+0000|Start scan |
+----------------------------+-----------------+
现在用 :
分隔符拆分 message
并创建 image_quality
列
df1 = df.withColumn('image_quality', f.split('message', ':')[1])
df1.show(10, False)
+----------------------------+-----------------+-------------+
|ts |message |image_quality|
+----------------------------+-----------------+-------------+
|2019-06-24T07:29:22.000+0000|Image Quality: 75| 75 |
|2019-06-25T07:29:22.000+0000|Start scan |null |
|2019-06-26T07:29:22.000+0000|Image Quality: 95| 95 |
|2019-06-27T07:29:22.000+0000|Start scan |null |
|2019-06-28T07:29:22.000+0000|Start scan |null |
+----------------------------+-----------------+-------------+
用 orderBy
ts
列定义 window
注意: 由于我们对解决方法感兴趣,因此没有添加任何 partitionBy
列,但如果可能,则始终添加 partitionBy
Spark 也会发出以下警告:
WARN WindowExec: No Partition Defined for Window operation! Moving all
data to a single partition, this can cause serious performance
degradation.
w_spec = Window.orderBy('ts')
最终数据集准备:
现在,应用 window
并使用 last('col', True)
找到最后可用的图像质量。这里 True
将忽略 null
值。
此外,过滤记录并删除 Not like Image Quality
或 == Start scan
。
final_df = df1.withColumn('image_quality', f.coalesce('image_quality', f.last('image_quality', True).over(w_spec))) \
.where(df1.message == 'Start scan')
final_df.show()
+--------------------+----------+-------------+
| ts| message|image_quality|
+--------------------+----------+-------------+
|2019-06-25T07:29:...|Start scan| 75|
|2019-06-27T07:29:...|Start scan| 95|
|2019-06-28T07:29:...|Start scan| 95|
+--------------------+----------+-------------+
对于给定的数据帧
spark.createDataFrame([
("2019-06-24T07:29:22.000+0000", "Image Quality: 75"),
("2019-06-25T07:29:22.000+0000", "Start scan"),
("2019-06-26T07:29:22.000+0000", "Image Quality: 95"),
("2019-06-27T07:29:22.000+0000", "Start scan"),
("2019-06-28T07:29:22.000+0000", "Start scan")
], ["ts", "message"])
我有兴趣设计一个图像质量特征,即组成以下数据帧。
+----------------------------+----------+-------------+
|ts |message |image_quality|
+----------------------------+----------+-------------+
|2019-06-25T07:29:22.000+0000|Start scan|75 |
|2019-06-27T07:29:22.000+0000|Start scan|95 |
|2019-06-28T07:29:22.000+0000|Start scan|95 |
+----------------------------+----------+-------------+
我尝试了 windows 函数和子查询的各种组合,但似乎没有找到可行的解决方案。
IIUC,您想将上次可用的图像质量延续到下一次可用。
可以使用 Window
:
来自给定数据集的假设:对于任何日期,它总是以 Image Quality: <some value>
开头,然后是 Start scan
导入并准备数据集:
# Import Window
from pyspark.sql.window import Window
import pyspark.sql.functions as f
df.show(10, False)
+----------------------------+-----------------+
|ts |message |
+----------------------------+-----------------+
|2019-06-24T07:29:22.000+0000|Image Quality: 75|
|2019-06-25T07:29:22.000+0000|Start scan |
|2019-06-26T07:29:22.000+0000|Image Quality: 95|
|2019-06-27T07:29:22.000+0000|Start scan |
|2019-06-28T07:29:22.000+0000|Start scan |
+----------------------------+-----------------+
现在用 :
分隔符拆分 message
并创建 image_quality
列
df1 = df.withColumn('image_quality', f.split('message', ':')[1])
df1.show(10, False)
+----------------------------+-----------------+-------------+
|ts |message |image_quality|
+----------------------------+-----------------+-------------+
|2019-06-24T07:29:22.000+0000|Image Quality: 75| 75 |
|2019-06-25T07:29:22.000+0000|Start scan |null |
|2019-06-26T07:29:22.000+0000|Image Quality: 95| 95 |
|2019-06-27T07:29:22.000+0000|Start scan |null |
|2019-06-28T07:29:22.000+0000|Start scan |null |
+----------------------------+-----------------+-------------+
用 orderBy
ts
列定义 window
注意: 由于我们对解决方法感兴趣,因此没有添加任何 partitionBy
列,但如果可能,则始终添加 partitionBy
Spark 也会发出以下警告:
WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
w_spec = Window.orderBy('ts')
最终数据集准备:
现在,应用 window
并使用 last('col', True)
找到最后可用的图像质量。这里 True
将忽略 null
值。
此外,过滤记录并删除 Not like Image Quality
或 == Start scan
。
final_df = df1.withColumn('image_quality', f.coalesce('image_quality', f.last('image_quality', True).over(w_spec))) \
.where(df1.message == 'Start scan')
final_df.show()
+--------------------+----------+-------------+
| ts| message|image_quality|
+--------------------+----------+-------------+
|2019-06-25T07:29:...|Start scan| 75|
|2019-06-27T07:29:...|Start scan| 95|
|2019-06-28T07:29:...|Start scan| 95|
+--------------------+----------+-------------+