在 Azure Databricks 中的日期范围内读取镶木地板文件的有效方法
Efficient way of reading parquet files between a date range in Azure Databricks
我想知道下面的伪代码是否是从 PySpark (Azure Databricks) 读取存储在 Azure Data Lake 中的日期范围之间的多个 parquet 文件的有效方法。注意:镶木地板文件未按日期分区。
我使用 uat/EntityName/2019/01/01/EntityName_2019_01_01_HHMMSS.parquet 约定在 ADL 中存储数据,正如 Nathan Marz 在大数据一书中所建议的那样,稍作修改(使用 2019 而不是 year=2019)。
使用 * 通配符读取所有数据:
df = spark.read.parquet(uat/EntityName/*/*/*/*)
添加一个列 FileTimestamp,使用字符串操作从 EntityName_2019_01_01_HHMMSS.parquet 中提取时间戳并转换为 TimestampType()
df.withColumn(add timestamp column)
使用过滤器获取相关数据:
start_date = '2018-12-15 00:00:00'
end_date = '2019-02-15 00:00:00'
df.filter(df.FileTimestamp >= start_date).filter(df.FileTimestamp < end_date)
基本上我使用 PySpark 来模拟 U-SQL:
中可用的简洁语法
@rs =
EXTRACT
user string,
id string,
__date DateTime
FROM
"/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
USING Extractors.Csv();
@rs =
SELECT *
FROM @rs
WHERE
date >= System.DateTime.Parse("2016/1/1") AND
date < System.DateTime.Parse("2016/2/1");
划分数据的正确方法是对数据使用年=2019、月=01 等格式。
当您使用以下过滤器查询此数据时:
df.filter(df.year >= myYear)
那么Spark只会读取相关的文件夹。
筛选栏名称必须准确出现在文件夹名称中,这一点非常重要。请注意,当您使用 Spark 写入分区数据时(例如按年、月、日),它不会将分区列写入 parquet 文件。它们是从路径中推断出来的。这确实意味着您的数据框在编写时将需要它们。当您从分区源读取时,它们也将作为列返回。
如果您无法更改文件夹结构,您始终可以使用正则表达式或 Glob 手动减少 Spark 读取的文件夹 - 本文应提供更多上下文 。但显然这更加手动和复杂。
更新:更多示例
也来自"Spark - The Definitive Guide: Big Data Processing Made Simple"
比尔·钱伯斯:
Partitioning is a tool that allows you to control what data is stored
(and where) as you write it. When you write a file to a partitioned
directory (or table), you basically encode a column as a folder. What
this allows you to do is skip lots of data when you go to read it in
later, allowing you to read in only the data relevant to your problem
instead of having to scan the complete dataset.
...
This is probably the lowest-hanging optimization that you can use when
you have a table that readers frequently filter by before
manipulating. For instance, date is particularly common for a
partition because, downstream, often we want to look at only the
previous week’s data (instead of scanning the entire list of records).
我想知道下面的伪代码是否是从 PySpark (Azure Databricks) 读取存储在 Azure Data Lake 中的日期范围之间的多个 parquet 文件的有效方法。注意:镶木地板文件未按日期分区。
我使用 uat/EntityName/2019/01/01/EntityName_2019_01_01_HHMMSS.parquet 约定在 ADL 中存储数据,正如 Nathan Marz 在大数据一书中所建议的那样,稍作修改(使用 2019 而不是 year=2019)。
使用 * 通配符读取所有数据:
df = spark.read.parquet(uat/EntityName/*/*/*/*)
添加一个列 FileTimestamp,使用字符串操作从 EntityName_2019_01_01_HHMMSS.parquet 中提取时间戳并转换为 TimestampType()
df.withColumn(add timestamp column)
使用过滤器获取相关数据:
start_date = '2018-12-15 00:00:00'
end_date = '2019-02-15 00:00:00'
df.filter(df.FileTimestamp >= start_date).filter(df.FileTimestamp < end_date)
基本上我使用 PySpark 来模拟 U-SQL:
中可用的简洁语法@rs =
EXTRACT
user string,
id string,
__date DateTime
FROM
"/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
USING Extractors.Csv();
@rs =
SELECT *
FROM @rs
WHERE
date >= System.DateTime.Parse("2016/1/1") AND
date < System.DateTime.Parse("2016/2/1");
划分数据的正确方法是对数据使用年=2019、月=01 等格式。
当您使用以下过滤器查询此数据时:
df.filter(df.year >= myYear)
那么Spark只会读取相关的文件夹。
筛选栏名称必须准确出现在文件夹名称中,这一点非常重要。请注意,当您使用 Spark 写入分区数据时(例如按年、月、日),它不会将分区列写入 parquet 文件。它们是从路径中推断出来的。这确实意味着您的数据框在编写时将需要它们。当您从分区源读取时,它们也将作为列返回。
如果您无法更改文件夹结构,您始终可以使用正则表达式或 Glob 手动减少 Spark 读取的文件夹 - 本文应提供更多上下文
更新:更多示例
也来自"Spark - The Definitive Guide: Big Data Processing Made Simple" 比尔·钱伯斯:
Partitioning is a tool that allows you to control what data is stored (and where) as you write it. When you write a file to a partitioned directory (or table), you basically encode a column as a folder. What this allows you to do is skip lots of data when you go to read it in later, allowing you to read in only the data relevant to your problem instead of having to scan the complete dataset. ...
This is probably the lowest-hanging optimization that you can use when you have a table that readers frequently filter by before manipulating. For instance, date is particularly common for a partition because, downstream, often we want to look at only the previous week’s data (instead of scanning the entire list of records).