时间过滤 Pyspark 数据框中的结构列
Time filtering a struct column in Pyspark dataframe
我有一个数据框,其中包含包含日期和值的结构的列,因此模式看起来像
root
|-- col1: struct (nullable = true)
| |-- dates: array (nullable = true)
| | |-- element: timestamp (containsNull = true)
| |-- values: array (nullable = true)
| | |-- element: double (containsNull = true)
|-- col2: struct (nullable = true)
| |-- dates: array (nullable = true)
| | |-- element: timestamp (containsNull = true)
| |-- values: array (nullable = true)
| | |-- element: double (containsNull = true)
|-- id: string (nullable = true)
给定一些时间索引:
time_index = datetime.datetime(2015, 12, 12, 4, 45)
及前后天数:
min_diff = -1 and max_diff = 2
我想要新列 col1_filt
和 col2_filt
,其结构与 return 属于 [=17= 定义的 window 内的日期相同] 和 min_diff
和 max_diff
以及相应的值。如果 none 的日期或值落在 window 范围内,我希望它 return None
.
下面是要使用的 DataFrame 示例。
示例数据帧:
example_input = [
Row(
id = "A",
col1 = Row(
dates = [datetime.datetime(2015, 12, 11, 5, 28), datetime.datetime(2015, 12, 12, 4, 45), datetime.datetime(2015, 12, 13, 5, 9)],
values = [17.7, 19.1, 19.1]
),
col2 = Row(
dates = [datetime.datetime(2015, 12, 13, 4, 48), datetime.datetime(2015, 12, 15, 5, 8)],
values = [19.1, 19.1]
)
),
Row(
id = "B",
col1 = Row(
dates = [datetime.datetime(2017, 1, 13, 5, 9)],
values = [19.1]
),
col2 = Row(
dates = [datetime.datetime(2017, 1, 12, 2, 48), datetime.datetime(2017, 1, 15, 5, 8)],
values = [19.5, 29.1]
)
),
]
df = spark.createDataFrame(example_input)
显示 df:
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+
|col1 |col2 |id |
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+
|[[2015-12-11 05:28:00, 2015-12-12 04:45:00, 2015-12-13 05:09:00], [17.7, 19.1, 19.1]]|[[2015-12-13 04:48:00, 2015-12-15 05:08:00], [19.1, 19.1]]|A |
|[[2017-01-13 05:09:00], [19.1]] |[[2017-01-12 02:48:00, 2017-01-15 05:08:00], [19.5, 29.1]]|B |
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+
我有一些代码将采用 Pyspark 行对象和 return 过滤后的 Pyspark 行对象,但我不确定如何将其设为 udf。
下面是一个使用 UDF 进行过滤的示例:
import datetime
import pyspark.sql.functions as F
from pyspark.sql import Row
time_index = datetime.datetime(2015, 12, 12, 4, 45)
min_diff = -1
max_diff = 2
def time_filter(r):
ret = list(zip(*[
x for x in list(zip(r['dates'], r['values']))
if x[0] > time_index + datetime.timedelta(days=min_diff)
and x[0] < time_index + datetime.timedelta(days=max_diff)
]))
return Row(dates=ret[0], values=ret[1]) if len(ret) != 0 else None
time_filter_udf = F.udf(time_filter, 'struct<dates:array<timestamp>,values:array<double>>')
df2 = df.withColumn('col1_filt', time_filter_udf('col1')).withColumn('col2_filt', time_filter_udf('col2'))
df2.show(truncate=False)
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+-------------------------------------------------------------------------------------+-------------------------------+
|col1 |col2 |id |col1_filt |col2_filt |
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+-------------------------------------------------------------------------------------+-------------------------------+
|[[2015-12-11 05:28:00, 2015-12-12 04:45:00, 2015-12-13 05:09:00], [17.7, 19.1, 19.1]]|[[2015-12-13 04:48:00, 2015-12-15 05:08:00], [19.1, 19.1]]|A |[[2015-12-11 05:28:00, 2015-12-12 04:45:00, 2015-12-13 05:09:00], [17.7, 19.1, 19.1]]|[[2015-12-13 04:48:00], [19.1]]|
|[[2017-01-13 05:09:00], [19.1]] |[[2017-01-12 02:48:00, 2017-01-15 05:08:00], [19.5, 29.1]]|B |null |null |
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+-------------------------------------------------------------------------------------+-------------------------------+
我有一个数据框,其中包含包含日期和值的结构的列,因此模式看起来像
root
|-- col1: struct (nullable = true)
| |-- dates: array (nullable = true)
| | |-- element: timestamp (containsNull = true)
| |-- values: array (nullable = true)
| | |-- element: double (containsNull = true)
|-- col2: struct (nullable = true)
| |-- dates: array (nullable = true)
| | |-- element: timestamp (containsNull = true)
| |-- values: array (nullable = true)
| | |-- element: double (containsNull = true)
|-- id: string (nullable = true)
给定一些时间索引:
time_index = datetime.datetime(2015, 12, 12, 4, 45)
及前后天数:
min_diff = -1 and max_diff = 2
我想要新列 col1_filt
和 col2_filt
,其结构与 return 属于 [=17= 定义的 window 内的日期相同] 和 min_diff
和 max_diff
以及相应的值。如果 none 的日期或值落在 window 范围内,我希望它 return None
.
下面是要使用的 DataFrame 示例。
示例数据帧:
example_input = [
Row(
id = "A",
col1 = Row(
dates = [datetime.datetime(2015, 12, 11, 5, 28), datetime.datetime(2015, 12, 12, 4, 45), datetime.datetime(2015, 12, 13, 5, 9)],
values = [17.7, 19.1, 19.1]
),
col2 = Row(
dates = [datetime.datetime(2015, 12, 13, 4, 48), datetime.datetime(2015, 12, 15, 5, 8)],
values = [19.1, 19.1]
)
),
Row(
id = "B",
col1 = Row(
dates = [datetime.datetime(2017, 1, 13, 5, 9)],
values = [19.1]
),
col2 = Row(
dates = [datetime.datetime(2017, 1, 12, 2, 48), datetime.datetime(2017, 1, 15, 5, 8)],
values = [19.5, 29.1]
)
),
]
df = spark.createDataFrame(example_input)
显示 df:
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+
|col1 |col2 |id |
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+
|[[2015-12-11 05:28:00, 2015-12-12 04:45:00, 2015-12-13 05:09:00], [17.7, 19.1, 19.1]]|[[2015-12-13 04:48:00, 2015-12-15 05:08:00], [19.1, 19.1]]|A |
|[[2017-01-13 05:09:00], [19.1]] |[[2017-01-12 02:48:00, 2017-01-15 05:08:00], [19.5, 29.1]]|B |
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+
我有一些代码将采用 Pyspark 行对象和 return 过滤后的 Pyspark 行对象,但我不确定如何将其设为 udf。
下面是一个使用 UDF 进行过滤的示例:
import datetime
import pyspark.sql.functions as F
from pyspark.sql import Row
time_index = datetime.datetime(2015, 12, 12, 4, 45)
min_diff = -1
max_diff = 2
def time_filter(r):
ret = list(zip(*[
x for x in list(zip(r['dates'], r['values']))
if x[0] > time_index + datetime.timedelta(days=min_diff)
and x[0] < time_index + datetime.timedelta(days=max_diff)
]))
return Row(dates=ret[0], values=ret[1]) if len(ret) != 0 else None
time_filter_udf = F.udf(time_filter, 'struct<dates:array<timestamp>,values:array<double>>')
df2 = df.withColumn('col1_filt', time_filter_udf('col1')).withColumn('col2_filt', time_filter_udf('col2'))
df2.show(truncate=False)
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+-------------------------------------------------------------------------------------+-------------------------------+
|col1 |col2 |id |col1_filt |col2_filt |
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+-------------------------------------------------------------------------------------+-------------------------------+
|[[2015-12-11 05:28:00, 2015-12-12 04:45:00, 2015-12-13 05:09:00], [17.7, 19.1, 19.1]]|[[2015-12-13 04:48:00, 2015-12-15 05:08:00], [19.1, 19.1]]|A |[[2015-12-11 05:28:00, 2015-12-12 04:45:00, 2015-12-13 05:09:00], [17.7, 19.1, 19.1]]|[[2015-12-13 04:48:00], [19.1]]|
|[[2017-01-13 05:09:00], [19.1]] |[[2017-01-12 02:48:00, 2017-01-15 05:08:00], [19.5, 29.1]]|B |null |null |
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+-------------------------------------------------------------------------------------+-------------------------------+