Spark Predicate 下推不适用于日期
Spark Predicate pushdown not working on date
我正在读取镶木地板文件并添加过滤器以匹配所有符合日期的记录 - 这里是 2021-04-03。该列不应为空,并且应该在给定的日期。
输入table
+---------+-----------+-------------------+
| lat| lng| eventDTLocal|
+---------+-----------+-------------------+
|34.269788| -98.239543|2021-04-03 19:18:58|
|29.780977| -95.749744|2021-04-03 19:33:24|
|48.150173|-122.191903|2021-04-03 17:25:00|
|40.652889| -74.185461|2021-04-03 20:27:55|
|41.747148| -87.799557|2021-04-03 19:52:39|
+---------+-----------+-------------------+
我已经尝试将列投射到最新,使用 substring_index 函数进行匹配,但我无法在推送的过滤器中获取它。
以下是我试过的代码:
df1 = spark.read.parquet("/Users/aadhithyahari/Downloads/awsfiles/part-00000-bfccec4c-7939-4f85-8fa9-5f1cb34f843a.c000.snappy.parquet") \
.select( 'lat', 'lng', 'eventDTLocal').filter("TO_DATE(CAST(UNIX_TIMESTAMP(`eventDTLocal`, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP),'yyyy-MM-dd') == CAST('2021-04-03' AS DATE)").explain(extended=True)
过滤器仅在数据过滤器中列出,而不会在其他任何地方列出。我在这里错过了什么?
并非所有过滤器都可以向下推。通常,大多数包含 substring
或 unix_timestamp
等函数调用的过滤器都无法下推。 DataSourceStrategy.
实现了过滤器下推的完整逻辑
在这种情况下解决此限制的一种方法是将 eventDTLocal
的值存储为 unix 时间戳而不是 parquet 文件中的字符串,然后按特定毫秒数进行过滤。
#create some test data
data = [(52.5151923, 13.3824107, 1618760421000),
(1.0, 1.0, 1)]
spark.createDataFrame(data, schema=['lat', 'lng', 'eventDTLocal']) \
.write.mode("overwrite").parquet("dataWithUnixTime")
#get the first and last millisecond of the day
#the timezone has probably to be adjusted
from datetime import datetime, timezone
dt = datetime(2021, 4, 18)
start = dt.replace(tzinfo=timezone.utc).timestamp() * 1000
end = start + 24 * 60 * 60 * 1000 - 1
#run the query
df = spark.read.parquet("dataWithUnixTime") \
.filter(f"eventDTLocal >= {start} and eventDTLocal <= {end}")
df
的物理计划
== Physical Plan ==
*(1) Project [lat#9, lng#10, eventDTLocal#11L]
+- *(1) Filter ((isnotnull(eventDTLocal#11L) AND (eventDTLocal#11L >= 1618704000000)) AND (eventDTLocal#11L <= 1618790399999))
+- *(1) ColumnarToRow
+- FileScan parquet [lat#9,lng#10,eventDTLocal#11L] Batched: true, DataFilters: [isnotnull(eventDTLocal#11L), (eventDTLocal#11L >= 1618704000000), (eventDTLocal#11L <= 161879039..., Format: Parquet, Location: InMemoryFileIndex[file:/home/werner/Java/pyspark3/dataWithUnixTime], PartitionFilters: [], PushedFilters: [IsNotNull(eventDTLocal), GreaterThanOrEqual(eventDTLocal,1618704000000), LessThanOrEqual(eventDT..., ReadSchema: struct<lat:double,lng:double,eventDTLocal:bigint>
现在包含日期列的推送过滤器 GreaterThanOrEqual
和 LessThanOrEqual
。
我正在读取镶木地板文件并添加过滤器以匹配所有符合日期的记录 - 这里是 2021-04-03。该列不应为空,并且应该在给定的日期。
输入table
+---------+-----------+-------------------+
| lat| lng| eventDTLocal|
+---------+-----------+-------------------+
|34.269788| -98.239543|2021-04-03 19:18:58|
|29.780977| -95.749744|2021-04-03 19:33:24|
|48.150173|-122.191903|2021-04-03 17:25:00|
|40.652889| -74.185461|2021-04-03 20:27:55|
|41.747148| -87.799557|2021-04-03 19:52:39|
+---------+-----------+-------------------+
我已经尝试将列投射到最新,使用 substring_index 函数进行匹配,但我无法在推送的过滤器中获取它。
以下是我试过的代码:
df1 = spark.read.parquet("/Users/aadhithyahari/Downloads/awsfiles/part-00000-bfccec4c-7939-4f85-8fa9-5f1cb34f843a.c000.snappy.parquet") \
.select( 'lat', 'lng', 'eventDTLocal').filter("TO_DATE(CAST(UNIX_TIMESTAMP(`eventDTLocal`, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP),'yyyy-MM-dd') == CAST('2021-04-03' AS DATE)").explain(extended=True)
过滤器仅在数据过滤器中列出,而不会在其他任何地方列出。我在这里错过了什么?
并非所有过滤器都可以向下推。通常,大多数包含 substring
或 unix_timestamp
等函数调用的过滤器都无法下推。 DataSourceStrategy.
在这种情况下解决此限制的一种方法是将 eventDTLocal
的值存储为 unix 时间戳而不是 parquet 文件中的字符串,然后按特定毫秒数进行过滤。
#create some test data
data = [(52.5151923, 13.3824107, 1618760421000),
(1.0, 1.0, 1)]
spark.createDataFrame(data, schema=['lat', 'lng', 'eventDTLocal']) \
.write.mode("overwrite").parquet("dataWithUnixTime")
#get the first and last millisecond of the day
#the timezone has probably to be adjusted
from datetime import datetime, timezone
dt = datetime(2021, 4, 18)
start = dt.replace(tzinfo=timezone.utc).timestamp() * 1000
end = start + 24 * 60 * 60 * 1000 - 1
#run the query
df = spark.read.parquet("dataWithUnixTime") \
.filter(f"eventDTLocal >= {start} and eventDTLocal <= {end}")
df
== Physical Plan ==
*(1) Project [lat#9, lng#10, eventDTLocal#11L]
+- *(1) Filter ((isnotnull(eventDTLocal#11L) AND (eventDTLocal#11L >= 1618704000000)) AND (eventDTLocal#11L <= 1618790399999))
+- *(1) ColumnarToRow
+- FileScan parquet [lat#9,lng#10,eventDTLocal#11L] Batched: true, DataFilters: [isnotnull(eventDTLocal#11L), (eventDTLocal#11L >= 1618704000000), (eventDTLocal#11L <= 161879039..., Format: Parquet, Location: InMemoryFileIndex[file:/home/werner/Java/pyspark3/dataWithUnixTime], PartitionFilters: [], PushedFilters: [IsNotNull(eventDTLocal), GreaterThanOrEqual(eventDTLocal,1618704000000), LessThanOrEqual(eventDT..., ReadSchema: struct<lat:double,lng:double,eventDTLocal:bigint>
现在包含日期列的推送过滤器 GreaterThanOrEqual
和 LessThanOrEqual
。