Pyspark 从日期层次存储中读取选定的日期文件
Pyspark read selected date files from date hierarchy storage
我正在尝试使用 Pyspark 读取多个 CSV 文件,数据由 Amazon Kinesis Firehose 处理,因此它们以以下格式写入。
s3bucket/
YYYY/
mm/
dd/
hh/
files.gz
files.gz
files.gz
我实际上正在使用此代码阅读一整天(例如 15/01/2019),使用正则表达式:
data = spark.read.format("s3selectJson").options(compression="GZIP", multiline=True) \
.load("s3://s3bucket/2019/01/15/*.gz".format(datetime_object.strftime("%Y/%m/%d")))
我的问题是,如何在知道我想要的日期的情况下读取多天的数据?有没有一种自动方法,或者我应该为我需要的日期制作一个正则表达式?
编辑:
我正在寻找的是下面文档中 DataFrameWriter.partitionBy(*cols) 方法的反函数
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=regex#pyspark.sql.DataFrameWriter
我担心,没有办法做到这一点。
如果您的数据结构如下(month=, year=...),我们称之为分区。
s3bucket/
year=YYYY/
month=mm/
day=dd/
hour=hh/
files.gz
files.gz
files.gz
并且您可以轻松地加载数据(在您的情况下是特定日期)
data = spark.read.format("s3selectJson").options(compression="GZIP", multiline=True) \
.load("s3://s3bucket/")
data_days = data.filter("day in (10, 20)")
有了分区,Spark 只加载你的特定日期,而不是所有日期。
我没有找到它的功能,但是,这是一个解决方法:
datetime_object = datetime.strptime("2019-01-31", '%Y-%m-%d')
delta_days = 10
base_bucket = "s3://s3bucket/{}/*/*.gz"
bucket_names = []
for date in [datetime_object - timedelta(days=x) for x in range(0, delta_days)]:
bucket_names.append(base_bucket.format(date.strftime("%Y/%m/%d")))
幸运的是,.load()
函数将列表作为源路径的参数,因此我根据需要的日期生成每条路径并将其提供给加载函数。
data = spark.read.format("csv").options(compression="GZIP") \
.load(bucket_names)
我正在尝试使用 Pyspark 读取多个 CSV 文件,数据由 Amazon Kinesis Firehose 处理,因此它们以以下格式写入。
s3bucket/
YYYY/
mm/
dd/
hh/
files.gz
files.gz
files.gz
我实际上正在使用此代码阅读一整天(例如 15/01/2019),使用正则表达式:
data = spark.read.format("s3selectJson").options(compression="GZIP", multiline=True) \
.load("s3://s3bucket/2019/01/15/*.gz".format(datetime_object.strftime("%Y/%m/%d")))
我的问题是,如何在知道我想要的日期的情况下读取多天的数据?有没有一种自动方法,或者我应该为我需要的日期制作一个正则表达式?
编辑:
我正在寻找的是下面文档中 DataFrameWriter.partitionBy(*cols) 方法的反函数
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=regex#pyspark.sql.DataFrameWriter
我担心,没有办法做到这一点。
如果您的数据结构如下(month=, year=...),我们称之为分区。
s3bucket/
year=YYYY/
month=mm/
day=dd/
hour=hh/
files.gz
files.gz
files.gz
并且您可以轻松地加载数据(在您的情况下是特定日期)
data = spark.read.format("s3selectJson").options(compression="GZIP", multiline=True) \
.load("s3://s3bucket/")
data_days = data.filter("day in (10, 20)")
有了分区,Spark 只加载你的特定日期,而不是所有日期。
我没有找到它的功能,但是,这是一个解决方法:
datetime_object = datetime.strptime("2019-01-31", '%Y-%m-%d')
delta_days = 10
base_bucket = "s3://s3bucket/{}/*/*.gz"
bucket_names = []
for date in [datetime_object - timedelta(days=x) for x in range(0, delta_days)]:
bucket_names.append(base_bucket.format(date.strftime("%Y/%m/%d")))
幸运的是,.load()
函数将列表作为源路径的参数,因此我根据需要的日期生成每条路径并将其提供给加载函数。
data = spark.read.format("csv").options(compression="GZIP") \
.load(bucket_names)