Pyspark 从日期层次存储中读取选定的日期文件

Pyspark read selected date files from date hierarchy storage

我正在尝试使用 Pyspark 读取多个 CSV 文件,数据由 Amazon Kinesis Firehose 处理,因此它们以以下格式写入。

s3bucket/ 
    YYYY/
        mm/
            dd/
                hh/
                    files.gz
                    files.gz
                    files.gz

我实际上正在使用此代码阅读一整天(例如 15/01/2019),使用正则表达式:

data = spark.read.format("s3selectJson").options(compression="GZIP", multiline=True) \
    .load("s3://s3bucket/2019/01/15/*.gz".format(datetime_object.strftime("%Y/%m/%d")))

我的问题是,如何在知道我想要的日期的情况下读取多天的数据?有没有一种自动方法,或者我应该为我需要的日期制作一个正则表达式?

编辑:
我正在寻找的是下面文档中 DataFrameWriter.partitionBy(*cols) 方法的反函数
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=regex#pyspark.sql.DataFrameWriter

我担心,没有办法做到这一点。

如果您的数据结构如下(month=, year=...),我们称之为分区。

s3bucket/ 
    year=YYYY/
        month=mm/
            day=dd/
                hour=hh/
                    files.gz
                    files.gz
                    files.gz

并且您可以轻松地加载数据(在您的情况下是特定日期)

data = spark.read.format("s3selectJson").options(compression="GZIP", multiline=True) \
  .load("s3://s3bucket/")

data_days = data.filter("day in (10, 20)")

有了分区,Spark 只加载你的特定日期,而不是所有日期。

我没有找到它的功能,但是,这是一个解决方法:

datetime_object = datetime.strptime("2019-01-31", '%Y-%m-%d')
delta_days = 10
base_bucket = "s3://s3bucket/{}/*/*.gz"
bucket_names = []
for date in [datetime_object - timedelta(days=x) for x in range(0, delta_days)]:
    bucket_names.append(base_bucket.format(date.strftime("%Y/%m/%d")))

幸运的是,.load() 函数将列表作为源路径的参数,因此我根据需要的日期生成每条路径并将其提供给加载函数。

data = spark.read.format("csv").options(compression="GZIP") \
        .load(bucket_names)