从文件系统有条件地加载分区
Conditional loading of partitions from file-system
我知道有人对 pySparks .load()
中的通配符有疑问 - 函数如 or 。
无论如何,我发现 questions/answers 中的 none 处理了我的变体。
上下文
在 pySpark 中,我想直接从 HDFS 加载文件,因为我必须为 Spark 2 使用 databricks avro-library。3.x。我是这样做的:
partition_stamp = "202104"
df = spark.read.format("com.databricks.spark.avro") \
.load(f"/path/partition={partition_stamp}*") \
.select("...")
如您所见,分区源自格式为 yyyyMMdd
.
的时间戳
问题
目前我只获得了 2021 年 4 月使用的所有分区 (partition_stamp = "202104"
)。
但是,我需要从 2021 年 4 月开始的所有分区。
用伪代码编写,我需要类似这样的解决方案:
.load(f"/path/partition >= {partition_stamp}*")
由于实际上存在数百个分区,因此以任何需要硬编码的方式进行分区都是没有用的。
所以我的问题是:是否有条件文件加载功能?
据我了解,在 .load()
函数中仅存在以下动态处理路径的选项:
*: Wildcard for any character or sequence of characters until the end of the line or a new sub-directory ('/') -> (/path/20200*)
[1-3]: Regex-like inclusion of a defined character-range -> (/path/20200[1-3]/...)
{1,2,3}: Set-like inclusion of a defined set of characters -> (/path/20200{1,2,3}/...)
因此,回答我的问题:条件file-loading没有built-in函数。
无论如何,我想为您提供我的解决方案:
import pandas as pd # Utilize pandas date-functions
partition_stamp = ",".join((set(
str(_range.year) + "{:02}".format(_range.month)
for _range in pd.date_range(start=start_date, end=end_date, freq='D')
)))
df = spark.read.format("com.databricks.spark.avro") \
.load(f"/path/partition={{{partition_stamp}}}*") \
.select("...")
这样,对于给定的开始和 end-date 动态生成 yyyyMM
格式的时间戳限制,并且 string-based .load()
仍然可用。
我知道有人对 pySparks .load()
中的通配符有疑问 - 函数如
上下文
在 pySpark 中,我想直接从 HDFS 加载文件,因为我必须为 Spark 2 使用 databricks avro-library。3.x。我是这样做的:
partition_stamp = "202104"
df = spark.read.format("com.databricks.spark.avro") \
.load(f"/path/partition={partition_stamp}*") \
.select("...")
如您所见,分区源自格式为 yyyyMMdd
.
问题
目前我只获得了 2021 年 4 月使用的所有分区 (partition_stamp = "202104"
)。
但是,我需要从 2021 年 4 月开始的所有分区。
用伪代码编写,我需要类似这样的解决方案:
.load(f"/path/partition >= {partition_stamp}*")
由于实际上存在数百个分区,因此以任何需要硬编码的方式进行分区都是没有用的。
所以我的问题是:是否有条件文件加载功能?
据我了解,在 .load()
函数中仅存在以下动态处理路径的选项:
*: Wildcard for any character or sequence of characters until the end of the line or a new sub-directory ('/') -> (/path/20200*)
[1-3]: Regex-like inclusion of a defined character-range -> (/path/20200[1-3]/...)
{1,2,3}: Set-like inclusion of a defined set of characters -> (/path/20200{1,2,3}/...)
因此,回答我的问题:条件file-loading没有built-in函数。
无论如何,我想为您提供我的解决方案:
import pandas as pd # Utilize pandas date-functions
partition_stamp = ",".join((set(
str(_range.year) + "{:02}".format(_range.month)
for _range in pd.date_range(start=start_date, end=end_date, freq='D')
)))
df = spark.read.format("com.databricks.spark.avro") \
.load(f"/path/partition={{{partition_stamp}}}*") \
.select("...")
这样,对于给定的开始和 end-date 动态生成 yyyyMM
格式的时间戳限制,并且 string-based .load()
仍然可用。