Spark 2.3.1 AWS EMR 不返回某些列的数据但在 Athena/Presto 和 Spectrum 中有效
Spark 2.3.1 AWS EMR not returning data for some columns yet works in Athena/Presto and Spectrum
我在 AWS EMR (Python 2.7.14)
上的 Spark 2.3.1 上使用 PySpark
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("spark.debug.maxToStringFields", 100) \
.enableHiveSupport() \
.getOrCreate()
spark.sql('select `message.country` from datalake.leads_notification where `message.country` is not null').show(10)
这 return 没有数据,找到 0 行。
上面 table 中每一行的每个值都是 returned Null。
数据存储在PARQUET.
当我在 AWS Athena/Presto 或 AWs Redshift Spectrum 上 运行 相同的 SQL 查询时,我得到所有列数据 return 正确编辑(大多数列值不是空)。
这是 Athena SQL 和 Redshift SQL 查询 return 的正确数据:
select "message.country" from datalake.leads_notification where "message.country" is not null limit 10;
我在所有情况下都使用 AWS Glue 目录。
上面的列未分区,但 table 在其他列上分区。我尝试使用 repair table,但没有帮助。
即 MSCK 修复 TABLE datalake.leads_notification
我试过 Schema Merge = True 是这样的:
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition", "true") \
.config("spark.sql.parquet.mergeSchema", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("spark.debug.maxToStringFields", 200) \
.enableHiveSupport() \
.getOrCreate()
没有区别,一列的每个值仍然是空值,即使有些值不是空值。
此列作为最后一列添加到 table,因此大多数数据确实为空,但有些行不为空。该列最后列在目录中的列列表中,位于分区列的正上方。
然而 Athena/Presto 检索所有非空值 OK,Redshift Spectrum 也是如此,但是 EMR Spark 2.3.1 PySpark 将此列的所有值显示为 "null"。正确检索 Spark 中的所有其他列。
谁能帮我调试一下这个问题?
由于输出格式的原因,Hive Schema 很难在此处剪切和粘贴。
***CREATE TABLE datalake.leads_notification(
message.environment.siteorigin string,
dcpheader.dcploaddateutc string,
message.id int,
message.country string,
message.financepackage.id string,
message.financepackage.version string)
PARTITIONED BY (
partition_year_utc string,
partition_month_utc string,
partition_day_utc string,
job_run_guid string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://blahblah/leads_notification/leads_notification/'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'UPDATED_BY_CRAWLER'='weekly_datalake_crawler',
'averageRecordSize'='3136',
'classification'='parquet',
'compressionType'='none',
'objectCount'='2',
'recordCount'='897025',
'sizeKey'='1573529662',
'spark.sql.create.version'='2.2 or prior',
'spark.sql.sources.schema.numPartCols'='4',
'spark.sql.sources.schema.numParts'='3',
'spark.sql.sources.schema.partCol.0'='partition_year_utc',
'spark.sql.sources.schema.partCol.1'='partition_month_utc',
'spark.sql.sources.schema.partCol.2'='partition_day_utc',
'spark.sql.sources.schema.partCol.3'='job_run_guid',
'typeOfData'='file')***
最后 3 列在 Spark 中都有相同的问题:
message.country string,
message.financepackage.id string,
message.financepackage.version string
在 Athena/Presto 和使用相同目录的 Redshift Spectrum 中全部 return 正常。
我为我的编辑道歉。
谢谢你
进行第 5 步架构检查:
http://www.openkb.info/2015/02/how-to-build-and-use-parquet-tools-to.html
我敢打赌,镶木地板定义中的这些新列名要么是大写的(而其他列名是小写的),要么是镶木地板定义中的新列名是小写的(而其他列名是大写的)
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition", "true") \
.config("spark.sql.parquet.mergeSchema", "true") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("spark.debug.maxToStringFields", 200) \
.enableHiveSupport() \
.getOrCreate()
这是解决方案:注意
.config("spark.sql.hive.convertMetastoreParquet", "false")
模式列都是小写的,模式是由 AWS Glue 创建的,而不是由我的自定义代码创建的,所以我真的不知道是什么导致了问题,所以在创建模式时使用上面的可能是安全的默认设置不在您的直接控制之下。恕我直言,这是一个很大的陷阱,所以我希望这对将来的其他人有所帮助。
感谢 tooptoop4 指出文章:
我在 AWS EMR (Python 2.7.14)
上的 Spark 2.3.1 上使用 PySparkspark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("spark.debug.maxToStringFields", 100) \
.enableHiveSupport() \
.getOrCreate()
spark.sql('select `message.country` from datalake.leads_notification where `message.country` is not null').show(10)
这 return 没有数据,找到 0 行。 上面 table 中每一行的每个值都是 returned Null。 数据存储在PARQUET.
当我在 AWS Athena/Presto 或 AWs Redshift Spectrum 上 运行 相同的 SQL 查询时,我得到所有列数据 return 正确编辑(大多数列值不是空)。
这是 Athena SQL 和 Redshift SQL 查询 return 的正确数据:
select "message.country" from datalake.leads_notification where "message.country" is not null limit 10;
我在所有情况下都使用 AWS Glue 目录。 上面的列未分区,但 table 在其他列上分区。我尝试使用 repair table,但没有帮助。 即 MSCK 修复 TABLE datalake.leads_notification
我试过 Schema Merge = True 是这样的:
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition", "true") \
.config("spark.sql.parquet.mergeSchema", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("spark.debug.maxToStringFields", 200) \
.enableHiveSupport() \
.getOrCreate()
没有区别,一列的每个值仍然是空值,即使有些值不是空值。
此列作为最后一列添加到 table,因此大多数数据确实为空,但有些行不为空。该列最后列在目录中的列列表中,位于分区列的正上方。
然而 Athena/Presto 检索所有非空值 OK,Redshift Spectrum 也是如此,但是 EMR Spark 2.3.1 PySpark 将此列的所有值显示为 "null"。正确检索 Spark 中的所有其他列。
谁能帮我调试一下这个问题?
由于输出格式的原因,Hive Schema 很难在此处剪切和粘贴。
***CREATE TABLE datalake.leads_notification(
message.environment.siteorigin string,
dcpheader.dcploaddateutc string,
message.id int,
message.country string,
message.financepackage.id string,
message.financepackage.version string)
PARTITIONED BY (
partition_year_utc string,
partition_month_utc string,
partition_day_utc string,
job_run_guid string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://blahblah/leads_notification/leads_notification/'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'UPDATED_BY_CRAWLER'='weekly_datalake_crawler',
'averageRecordSize'='3136',
'classification'='parquet',
'compressionType'='none',
'objectCount'='2',
'recordCount'='897025',
'sizeKey'='1573529662',
'spark.sql.create.version'='2.2 or prior',
'spark.sql.sources.schema.numPartCols'='4',
'spark.sql.sources.schema.numParts'='3',
'spark.sql.sources.schema.partCol.0'='partition_year_utc',
'spark.sql.sources.schema.partCol.1'='partition_month_utc',
'spark.sql.sources.schema.partCol.2'='partition_day_utc',
'spark.sql.sources.schema.partCol.3'='job_run_guid',
'typeOfData'='file')***
最后 3 列在 Spark 中都有相同的问题:
message.country string,
message.financepackage.id string,
message.financepackage.version string
在 Athena/Presto 和使用相同目录的 Redshift Spectrum 中全部 return 正常。
我为我的编辑道歉。
谢谢你
进行第 5 步架构检查: http://www.openkb.info/2015/02/how-to-build-and-use-parquet-tools-to.html
我敢打赌,镶木地板定义中的这些新列名要么是大写的(而其他列名是小写的),要么是镶木地板定义中的新列名是小写的(而其他列名是大写的)
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition", "true") \
.config("spark.sql.parquet.mergeSchema", "true") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("spark.debug.maxToStringFields", 200) \
.enableHiveSupport() \
.getOrCreate()
这是解决方案:注意
.config("spark.sql.hive.convertMetastoreParquet", "false")
模式列都是小写的,模式是由 AWS Glue 创建的,而不是由我的自定义代码创建的,所以我真的不知道是什么导致了问题,所以在创建模式时使用上面的可能是安全的默认设置不在您的直接控制之下。恕我直言,这是一个很大的陷阱,所以我希望这对将来的其他人有所帮助。 感谢 tooptoop4 指出文章: