使用 glueContext.write_dynamic_frame.from_options 将 AWS Glue 导出到镶木地板问题

AWS Glue export to parquet issue using glueContext.write_dynamic_frame.from_options

我有以下问题。

以下代码由 AWS Glue 自动生成。

它的任务是从 Athena 获取数据(由 .csv @ S3 备份)并将数据转换为 Parquet。

该代码适用于参考航班数据集和一些相对较大的表 (~100 Gb)。

然而,在大多数情况下它 returns 错误,这并不能告诉我太多。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkConf, SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

conf = (SparkConf()
    .set("spark.driver.maxResultSize", "8g"))

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "XXX", table_name = "csv_impressions", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("event time", "long", "event_time", "long"), ("user id", "string", "user_id", "string"), ("advertiser id", "long", "advertiser_id", "long"), ("campaign id", "long", "campaign_id", "long")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://xxxx"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

AWS Glue 识别的错误消息是:

An error occurred while calling o72.pyWriteDynamicFrame

日志文件还包含:

Job aborted due to stage failure: ... Task failed while writing rows

知道如何找出失败的原因吗?

或者它可能是什么?

第 1 部分:确定问题

如何找到导致问题的解决方案是将输出从 .parquet 切换到 .csv 并删除 ResolveChoiceDropNullFields(因为它是自动建议的通过胶水 .parquet):

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://xxxx"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

它产生了更详细的错误信息:

An error occurred while calling o120.pyWriteDynamicFrame. Job aborted due to stage failure: Task 5 in stage 0.0 failed 4 times, most recent failure: Lost task 5.3 in stage 0.0 (TID 182, ip-172-31-78-99.ec2.internal, executor 15): com.amazonaws.services.glue.util.FatalException: Unable to parse file: xxxx1.csv.gz

错误消息中提到的文件 xxxx1.csv.gz 对于 Glue 来说似乎太大了(~100Mb .gzip 和 ~350Mb 未压缩 .csv)。

第 2 部分:问题的真正根源和修复

如第一部分所述,多亏了导出到 .csv,才可能识别出错误的文件。

通过将 .csv 加载到 R 中进行的进一步调查表明,其中一列包含单个 string 记录,而该列的所有其他值都是 longNULL

在 R 中删除此值并将数据重新上传到 S3 后,问题消失了。

注意 #1:该列已在 Athena 中声明 string,因此我将此行为视为错误

注意#2:问题的本质不是数据的大小。我已经成功处理了高达 200Mb .csv.gz 的文件,大约相当于 600 Mb .csv.

请使用数据目录中更新的 table 架构。

我也遇到过同样的错误。在我的例子中,爬虫在数据库中创建了另一个 table 相同的文件。我指的是旧的。如果爬虫一次又一次地爬行相同的路径并在数据目录中创建不同的架构 table,就会发生这种情况。所以粘合工作没有找到 table 名称和架构。从而给出这个错误。

此外,您可以将 DeleteBehavior: "LOG" 更改为 DeleteBehavior: "DELETE_IN_DATABASE"