Spark Predicate 下推不适用于日期

Spark Predicate pushdown not working on date

我正在读取镶木地板文件并添加过滤器以匹配所有符合日期的记录 - 这里是 2021-04-03。该列不应为空,并且应该在给定的日期。

输入table

+---------+-----------+-------------------+
|      lat|        lng|       eventDTLocal|
+---------+-----------+-------------------+
|34.269788| -98.239543|2021-04-03 19:18:58|
|29.780977| -95.749744|2021-04-03 19:33:24|
|48.150173|-122.191903|2021-04-03 17:25:00|
|40.652889| -74.185461|2021-04-03 20:27:55|
|41.747148| -87.799557|2021-04-03 19:52:39|
+---------+-----------+-------------------+

我已经尝试将列投射到最新,使用 substring_index 函数进行匹配,但我无法在推送的过滤器中获取它。

以下是我试过的代码:

df1 = spark.read.parquet("/Users/aadhithyahari/Downloads/awsfiles/part-00000-bfccec4c-7939-4f85-8fa9-5f1cb34f843a.c000.snappy.parquet") \
        .select( 'lat', 'lng', 'eventDTLocal').filter("TO_DATE(CAST(UNIX_TIMESTAMP(`eventDTLocal`, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP),'yyyy-MM-dd') == CAST('2021-04-03' AS DATE)").explain(extended=True)

过滤器仅在数据过滤器中列出,而不会在其他任何地方列出。我在这里错过了什么?

并非所有过滤器都可以向下推。通常,大多数包含 substringunix_timestamp 等函数调用的过滤器都无法下推。 DataSourceStrategy.

实现了过滤器下推的完整逻辑

在这种情况下解决此限制的一种方法是将 eventDTLocal 的值存储为 unix 时间戳而不是 parquet 文件中的字符串,然后按特定毫秒数进行过滤。

#create some test data
data = [(52.5151923, 13.3824107, 1618760421000), 
        (1.0, 1.0, 1)]
spark.createDataFrame(data, schema=['lat', 'lng', 'eventDTLocal']) \
    .write.mode("overwrite").parquet("dataWithUnixTime")

#get the first and last millisecond of the day
#the timezone has probably to be adjusted
from datetime import datetime, timezone
dt = datetime(2021, 4, 18)
start = dt.replace(tzinfo=timezone.utc).timestamp() * 1000
end = start + 24 * 60 * 60 * 1000 - 1

#run the query
df = spark.read.parquet("dataWithUnixTime") \
    .filter(f"eventDTLocal >= {start} and eventDTLocal <= {end}")

df

的物理计划

== Physical Plan ==
*(1) Project [lat#9, lng#10, eventDTLocal#11L]
+- *(1) Filter ((isnotnull(eventDTLocal#11L) AND (eventDTLocal#11L >= 1618704000000)) AND (eventDTLocal#11L <= 1618790399999))
   +- *(1) ColumnarToRow
      +- FileScan parquet [lat#9,lng#10,eventDTLocal#11L] Batched: true, DataFilters: [isnotnull(eventDTLocal#11L), (eventDTLocal#11L >= 1618704000000), (eventDTLocal#11L <= 161879039..., Format: Parquet, Location: InMemoryFileIndex[file:/home/werner/Java/pyspark3/dataWithUnixTime], PartitionFilters: [], PushedFilters: [IsNotNull(eventDTLocal), GreaterThanOrEqual(eventDTLocal,1618704000000), LessThanOrEqual(eventDT..., ReadSchema: struct<lat:double,lng:double,eventDTLocal:bigint>

现在包含日期列的推送过滤器 GreaterThanOrEqualLessThanOrEqual