带有 PySpark 的 AWS Glue - DynamicFrame 导出到 S3 中途失败,出现 UnsupportedOperationException
AWS Glue with PySpark - DynamicFrame export to S3 fails partway through with UnsupportedOperationException
首先,我应该说我一直在使用 AWS Glue Studio 来学习如何将 Glue 与 PySpark 结合使用,到目前为止,一切进展顺利。直到我遇到一个我无法理解(更不用说解决)的错误。可以在底部找到数据示例。
上下文
我所做的只是一个简单的数据转换。 Input S3 Bucket --> CustomTransform --> Output S3
。但是程序在导出部分数据后一直崩溃。我稍后也会提到它,但我什至尝试删除 CustomTransformation,但 S3 数据导出仍然失败,即使只是从一个 Bucket 到另一个 Bucket。
错误
这是我收到的错误的 Python 部分(从 CloudWatch 复制):
2021-03-26 09:03:09,200 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
File "/tmp/GlueTest.py", line 69, in <module>
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "glueparquet", connection_options = {
"path": "s3://example-bucket-here/data/",
"compression": "snappy",
"partitionKeys": []
}, transformation_ctx = "DataSink0")
File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
format, format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
return sink.write(frame_or_dfc)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
return self.writeFrame(dynamic_frame_or_dfc, info)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o85.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 76, 172.36.109.34, executor 6): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
真正的谜题
最让我困惑的是这次崩溃发生在之后它已经将大部分数据导出到 S3。这会立即表明数据有问题,因为它会遇到一些损坏的(或格式不正确的)数据然后崩溃。
所以我查看了成功导出的数据和输入数据之间的差异,找到了所有未导出的行。没有什么让我感到奇怪或导出失败的原因。
当我 select S3 存储桶作为输入源时,了解 AWS Glue 正在推断模式可能会有所帮助。
我试过的
所以我尝试以 Glue 支持的所有不同格式导出数据,但 none 成功了。我还尝试跳过所有数据转换,只获取输入 S3 存储桶并直接导出到输出 S3 存储桶,但它仍然崩溃并出现相同的错误(实际上这就是我在上面包含的错误消息!)。
同样,所有这些都表明数据有问题,但我查看了所有未通过流程的数据(只有大约 180 条记录),它们看起来就像确实通过了。
为了完整性检查,我对其他一些(非常相似的)数据使用了 Input S3 --> Output S3 方法,它工作正常,基本上充当复制粘贴。
我也遇到了this article。但这并没有真正帮助,当我尝试更改输出格式以获取更多信息时,我遇到了同样的错误 - 没有额外信息。
有没有人可以帮助确定这里的问题?没有任何迹象表明这会崩溃。 我很乐意提供 Java 错误的其余部分,如果这对人们有帮助的话。
数据示例
这是我的数据:
Date ticker_name currency exchange_name instrument_type first_trade_date Amount
1612229400 0382.HK HKD HKG EQUITY 1563240600 0.049
1613140200 SO USD NYQ EQUITY 378657000 0.64
1613053800 SIGI USD NMS EQUITY 322151400 0.25
1614240000 SIGT.L GBp LSE EQUITY 828601200 1.68
1612249200 SIH.BE EUR BER EQUITY 1252044000 0.038
除日期(长)、first_trade_date(长)和金额(双)外,所有字段都是字符串。
当我调用 .printSchema()
时,我得到以下信息:
root
|-- Date: long
|-- currency: string
|-- exchange_name: string
|-- instrument_type: string
|-- first_trade_date: long
|-- Amount: double
解决方案
因此,如果有人遇到此问题,可能会令人沮丧,因为此错误似乎没有提供有关实际问题所在的任何信息。我仅有的线索之一是 this article。这表明我的架构有问题。
我不得不非常仔细地查看我的数据,最终发现只有当我 运行 将某些文件与其他文件结合使用时才会出现此错误。
事实证明,我的某些镶木地板文件的日期格式为 int
,而其他时候为 float
。此数据是在不同函数中使用 .to_parquet()
从 Pandas DataFrame 创建的,因此我不确定为什么数据类型不一致。
最让我困惑的是,为什么当我尝试将日期类型全部转换为 int
(如 所示)时,我仍然遇到错误。
无论如何,我的解决方案是修复 Pandas 输出数据的方式,并确保在 Glue 处理数据之前它始终将日期输出为整数。
首先,我应该说我一直在使用 AWS Glue Studio 来学习如何将 Glue 与 PySpark 结合使用,到目前为止,一切进展顺利。直到我遇到一个我无法理解(更不用说解决)的错误。可以在底部找到数据示例。
上下文
我所做的只是一个简单的数据转换。 Input S3 Bucket --> CustomTransform --> Output S3
。但是程序在导出部分数据后一直崩溃。我稍后也会提到它,但我什至尝试删除 CustomTransformation,但 S3 数据导出仍然失败,即使只是从一个 Bucket 到另一个 Bucket。
错误
这是我收到的错误的 Python 部分(从 CloudWatch 复制):
2021-03-26 09:03:09,200 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
File "/tmp/GlueTest.py", line 69, in <module>
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "glueparquet", connection_options = {
"path": "s3://example-bucket-here/data/",
"compression": "snappy",
"partitionKeys": []
}, transformation_ctx = "DataSink0")
File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
format, format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
return sink.write(frame_or_dfc)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
return self.writeFrame(dynamic_frame_or_dfc, info)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o85.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 76, 172.36.109.34, executor 6): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
真正的谜题
最让我困惑的是这次崩溃发生在之后它已经将大部分数据导出到 S3。这会立即表明数据有问题,因为它会遇到一些损坏的(或格式不正确的)数据然后崩溃。
所以我查看了成功导出的数据和输入数据之间的差异,找到了所有未导出的行。没有什么让我感到奇怪或导出失败的原因。
当我 select S3 存储桶作为输入源时,了解 AWS Glue 正在推断模式可能会有所帮助。
我试过的
所以我尝试以 Glue 支持的所有不同格式导出数据,但 none 成功了。我还尝试跳过所有数据转换,只获取输入 S3 存储桶并直接导出到输出 S3 存储桶,但它仍然崩溃并出现相同的错误(实际上这就是我在上面包含的错误消息!)。
同样,所有这些都表明数据有问题,但我查看了所有未通过流程的数据(只有大约 180 条记录),它们看起来就像确实通过了。
为了完整性检查,我对其他一些(非常相似的)数据使用了 Input S3 --> Output S3 方法,它工作正常,基本上充当复制粘贴。
我也遇到了this article。但这并没有真正帮助,当我尝试更改输出格式以获取更多信息时,我遇到了同样的错误 - 没有额外信息。
有没有人可以帮助确定这里的问题?没有任何迹象表明这会崩溃。 我很乐意提供 Java 错误的其余部分,如果这对人们有帮助的话。
数据示例
这是我的数据:
Date ticker_name currency exchange_name instrument_type first_trade_date Amount
1612229400 0382.HK HKD HKG EQUITY 1563240600 0.049
1613140200 SO USD NYQ EQUITY 378657000 0.64
1613053800 SIGI USD NMS EQUITY 322151400 0.25
1614240000 SIGT.L GBp LSE EQUITY 828601200 1.68
1612249200 SIH.BE EUR BER EQUITY 1252044000 0.038
除日期(长)、first_trade_date(长)和金额(双)外,所有字段都是字符串。
当我调用 .printSchema()
时,我得到以下信息:
root
|-- Date: long
|-- currency: string
|-- exchange_name: string
|-- instrument_type: string
|-- first_trade_date: long
|-- Amount: double
解决方案
因此,如果有人遇到此问题,可能会令人沮丧,因为此错误似乎没有提供有关实际问题所在的任何信息。我仅有的线索之一是 this article。这表明我的架构有问题。
我不得不非常仔细地查看我的数据,最终发现只有当我 运行 将某些文件与其他文件结合使用时才会出现此错误。
事实证明,我的某些镶木地板文件的日期格式为 int
,而其他时候为 float
。此数据是在不同函数中使用 .to_parquet()
从 Pandas DataFrame 创建的,因此我不确定为什么数据类型不一致。
最让我困惑的是,为什么当我尝试将日期类型全部转换为 int
(如
无论如何,我的解决方案是修复 Pandas 输出数据的方式,并确保在 Glue 处理数据之前它始终将日期输出为整数。