Spark 读取 Partitioned avro 比指向确切位置慢得多
Spark reading Partitioned avro significantly slower than pointing to exact location
我正在尝试读取分区的 Avro 数据,这些数据是根据年、月和日进行分区的,这似乎比直接将其指向路径要慢得多。
在物理计划中,我可以看到分区过滤器正在传递,因此它没有扫描整组目录,但速度仍然很慢。
例如像这样读取分区数据
profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/"
profitLoss = spark.read.\
format("com.databricks.spark.avro").\
option("header", "false").\
option("inferSchema", "false").load(profitLossPath)
profitLoss.createOrReplaceTempView("ProfitLosstt")
df=sqlContext.sql("SELECT * \
FROM ProfitLosstt \
where Year= " + year + " and Month=" + month_nz + " and Day=" + date_nz )
大约需要 3 分钟
而这是我使用字符串生成器指向确切位置的地方,在 2 秒内完成
profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/Year=" +year +"/Month=" + month_nz + "/Day=" + date_nz
profitLoss = spark.read.\
format("com.databricks.spark.avro").\
option("header", "false").\
option("inferSchema", "false").load(profitLossPath)
profitLoss.createOrReplaceTempView("ProfitLosstt")
df=sqlContext.sql("SELECT * \
FROM ProfitLosstt "
)
display(df)
查看第一个(较慢)的物理计划确实表明分区过滤器已通过
什么可以解释发现阶段花了这么长时间?
有什么问题,我可以详细说明。
好的,缓慢的原因是因为 InMemoryFileIndex 的构建。
虽然进行了分区修剪,但 Spark 需要了解分区和文件信息,而这正是它需要执行该步骤的地方。
S.O post 详细说明:here
所以,当时的想法是创建一个外部 table,以便构建此信息,我使用这样的脚本来完成(我使用了一个内联模式,你可以使用一个模式文件,如果你有一个)
create external table ProfitLossAvro
partitioned by (Year int, Month int, Day int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
Stored As
inputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
outputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
Location 'abfss://raw@datalakename.dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/'
TBLPROPERTIES (
'avro.schema.literal'='{
"name": "Microsoft.Hadoop.Avro.Specifications.ProfitLoss",
"type": "record",
"fields": [{ "name":"MK_DatesID_TradeDate", "type":["int", "null"]},{ "name":"MK_UCRAccountsID_AccountID", "type":["int", "null"]},{ "name":"MK_ProductCategoriesID_ProductCategoryID", "type":["int", "null"]},{ "name":"CurrencyCode", "type":["string", "null"]},{ "name":"ProfitLoss", "type":["double", "null"]},{ "name":"MK_PnLAmountTypesID_PLBookingTypeID", "type":["int", "null"]}]
}');
但是如果您随后查询此 table,您将获得 0 行。这是因为现有分区不会自动添加。所以,你可以使用
msck repair table ProfitLossAvro
并且每次将数据添加到数据湖时,您都可以添加分区。
像这样:-
ALTER TABLE ProfitLossAvro ADD PARTITION (Year=2020, Month=6, Day=26)
如果您随后使用如下命令查询数据,它将工作得更快
df=sqlContext.sql("select * \
from ProfitLossAvro \
where Year=" + year + " and Month=" + month_nz + " and Day=" + date_nz)
display(df)
我正在尝试读取分区的 Avro 数据,这些数据是根据年、月和日进行分区的,这似乎比直接将其指向路径要慢得多。 在物理计划中,我可以看到分区过滤器正在传递,因此它没有扫描整组目录,但速度仍然很慢。
例如像这样读取分区数据
profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/"
profitLoss = spark.read.\
format("com.databricks.spark.avro").\
option("header", "false").\
option("inferSchema", "false").load(profitLossPath)
profitLoss.createOrReplaceTempView("ProfitLosstt")
df=sqlContext.sql("SELECT * \
FROM ProfitLosstt \
where Year= " + year + " and Month=" + month_nz + " and Day=" + date_nz )
大约需要 3 分钟
而这是我使用字符串生成器指向确切位置的地方,在 2 秒内完成
profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/Year=" +year +"/Month=" + month_nz + "/Day=" + date_nz
profitLoss = spark.read.\
format("com.databricks.spark.avro").\
option("header", "false").\
option("inferSchema", "false").load(profitLossPath)
profitLoss.createOrReplaceTempView("ProfitLosstt")
df=sqlContext.sql("SELECT * \
FROM ProfitLosstt "
)
display(df)
查看第一个(较慢)的物理计划确实表明分区过滤器已通过
什么可以解释发现阶段花了这么长时间?
有什么问题,我可以详细说明。
好的,缓慢的原因是因为 InMemoryFileIndex 的构建。
虽然进行了分区修剪,但 Spark 需要了解分区和文件信息,而这正是它需要执行该步骤的地方。 S.O post 详细说明:here
所以,当时的想法是创建一个外部 table,以便构建此信息,我使用这样的脚本来完成(我使用了一个内联模式,你可以使用一个模式文件,如果你有一个)
create external table ProfitLossAvro
partitioned by (Year int, Month int, Day int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
Stored As
inputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
outputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
Location 'abfss://raw@datalakename.dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/'
TBLPROPERTIES (
'avro.schema.literal'='{
"name": "Microsoft.Hadoop.Avro.Specifications.ProfitLoss",
"type": "record",
"fields": [{ "name":"MK_DatesID_TradeDate", "type":["int", "null"]},{ "name":"MK_UCRAccountsID_AccountID", "type":["int", "null"]},{ "name":"MK_ProductCategoriesID_ProductCategoryID", "type":["int", "null"]},{ "name":"CurrencyCode", "type":["string", "null"]},{ "name":"ProfitLoss", "type":["double", "null"]},{ "name":"MK_PnLAmountTypesID_PLBookingTypeID", "type":["int", "null"]}]
}');
但是如果您随后查询此 table,您将获得 0 行。这是因为现有分区不会自动添加。所以,你可以使用
msck repair table ProfitLossAvro
并且每次将数据添加到数据湖时,您都可以添加分区。 像这样:-
ALTER TABLE ProfitLossAvro ADD PARTITION (Year=2020, Month=6, Day=26)
如果您随后使用如下命令查询数据,它将工作得更快
df=sqlContext.sql("select * \
from ProfitLossAvro \
where Year=" + year + " and Month=" + month_nz + " and Day=" + date_nz)
display(df)