从文件系统有条件地加载分区

Conditional loading of partitions from file-system

我知道有人对 pySparks .load() 中的通配符有疑问 - 函数如 or 。 无论如何,我发现 questions/answers 中的 none 处理了我的变体。

上下文

在 pySpark 中,我想直接从 HDFS 加载文件,因为我必须为 Spark 2 使用 databricks avro-library。3.x。我是这样做的:

partition_stamp = "202104"

df = spark.read.format("com.databricks.spark.avro") \
        .load(f"/path/partition={partition_stamp}*") \
        .select("...")

如您所见,分区源自格式为 yyyyMMdd.

的时间戳

问题

目前我只获得了 2021 年 4 月使用的所有分区 (partition_stamp = "202104")。 但是,我需要从 2021 年 4 月开始的所有分区。

用伪代码编写,我需要类似这样的解决方案:

.load(f"/path/partition >= {partition_stamp}*")

由于实际上存在数百个分区,因此以任何需要硬编码的方式进行分区都是没有用的。


所以我的问题是:是否有条件文件加载功能?

据我了解,在 .load() 函数中仅存在以下动态处理路径的选项:

*:  Wildcard for any character or sequence of characters until the end of the line or a new sub-directory ('/') -> (/path/20200*)
[1-3]: Regex-like inclusion of a defined character-range -> (/path/20200[1-3]/...)
{1,2,3}: Set-like inclusion of a defined set of characters -> (/path/20200{1,2,3}/...)

因此,回答我的问题:条件file-loading没有built-in函数。




无论如何,我想为您提供我的解决方案:

import pandas as pd # Utilize pandas date-functions

partition_stamp = ",".join((set(
                        str(_range.year) + "{:02}".format(_range.month) 
                        for _range in pd.date_range(start=start_date, end=end_date, freq='D')
                 )))

df = spark.read.format("com.databricks.spark.avro") \
        .load(f"/path/partition={{{partition_stamp}}}*") \
        .select("...")

这样,对于给定的开始和 end-date 动态生成 yyyyMM 格式的时间戳限制,并且 string-based .load() 仍然可用。