dataframe_obj.count() 上 AWS Glue 中的空指针异常
Nullpointerexception in AWS Glue on dataframe_obj.count()
美好的一天
我正在 AWS 上编写 Glue 作业来转换数据。在对两组数据进行连接后(生成大小约为 100MB 的数据帧),在检索数据帧上的计数时出现 Nullpointer 异常。使此错误难以追踪的原因是它只是偶尔发生 - 偶尔会成功。
错误是:
21/05/07 08:27:08 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
File "/tmp/transform.py", line 398, in <module>
main()
File "/tmp/transform.py", line 355, in main
extract_data(context, df1_trans, df2_trans)
File "/tmp/transform.py", line 264, in extract_data
joined_count = joined.count()
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 522, in count
return int(self._jdf.count())
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o689.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 13.0 failed 4 times, most recent failure: Lost task 7.3 in stage 13.0 (TID 8795, 172.35.98.112, executor 5): java.lang.NullPointerException
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:207)
at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:642)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:148)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:131)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues.apply(ParquetFileFormat.scala:418)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues.apply(ParquetFileFormat.scala:352)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
有问题的代码:
print("Applying join.")
joined = (df1.join(df2, df1['ID'] == df2['ID'], how='inner')
.select(df1["*"], df2["*"])
.dropDuplicates(['ID', 'VAL']))
extract_data joined_count = joined.count() # Nullpointer exception here...
print(f"Joined data: {joined_count}.")
write_out(context, joined, 'joined', "s3://<some_bucket>", "csv")
# Retrieve data from joined data.
tmp = (joined
.withColumn('IDENT', joined['ID'])
.withColumn('V1', joined['SOME_VALUE'])
.withColumn('V2', joined['TIME'])
.withColumn('V3', sf.lit('BLAH'))
.withColumn('V4', sf.lit('3.14'))
.select(['IDENT', 'V1', 'V2', 'V3', 'V4']))
tmp_count = tmp.count()
并写出代码:
def write_out(context, out, name, destination, destination_format):
"""
Writes out the data as a single file.
:param context:
:param out:
:param name:
:param destination:
:param destination_format:
:return:
"""
print(f"Writing {name} to {destination}.")
glue_df = DynamicFrame.fromDF(out.repartition(1), context)
context.write_dynamic_frame.from_options(
frame=glue_df,
connection_type="s3",
connection_options={"path": destination},
format=destination_format)
...任何有关查找位置的帮助或想法都会有所帮助。
以防有人也遇到这个问题。它可能在数据被“收集”的那一刻发生。因此,在写出数据分区、获取计数等方面
该解决方案似乎与加载数据时的 Glue Dynamic Dataframe 有关。更改为 Spark Dataframe,你就完蛋了。
美好的一天
我正在 AWS 上编写 Glue 作业来转换数据。在对两组数据进行连接后(生成大小约为 100MB 的数据帧),在检索数据帧上的计数时出现 Nullpointer 异常。使此错误难以追踪的原因是它只是偶尔发生 - 偶尔会成功。
错误是:
21/05/07 08:27:08 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
File "/tmp/transform.py", line 398, in <module>
main()
File "/tmp/transform.py", line 355, in main
extract_data(context, df1_trans, df2_trans)
File "/tmp/transform.py", line 264, in extract_data
joined_count = joined.count()
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 522, in count
return int(self._jdf.count())
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o689.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 13.0 failed 4 times, most recent failure: Lost task 7.3 in stage 13.0 (TID 8795, 172.35.98.112, executor 5): java.lang.NullPointerException
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:207)
at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:642)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:148)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:131)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues.apply(ParquetFileFormat.scala:418)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues.apply(ParquetFileFormat.scala:352)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
有问题的代码:
print("Applying join.")
joined = (df1.join(df2, df1['ID'] == df2['ID'], how='inner')
.select(df1["*"], df2["*"])
.dropDuplicates(['ID', 'VAL']))
extract_data joined_count = joined.count() # Nullpointer exception here...
print(f"Joined data: {joined_count}.")
write_out(context, joined, 'joined', "s3://<some_bucket>", "csv")
# Retrieve data from joined data.
tmp = (joined
.withColumn('IDENT', joined['ID'])
.withColumn('V1', joined['SOME_VALUE'])
.withColumn('V2', joined['TIME'])
.withColumn('V3', sf.lit('BLAH'))
.withColumn('V4', sf.lit('3.14'))
.select(['IDENT', 'V1', 'V2', 'V3', 'V4']))
tmp_count = tmp.count()
并写出代码:
def write_out(context, out, name, destination, destination_format):
"""
Writes out the data as a single file.
:param context:
:param out:
:param name:
:param destination:
:param destination_format:
:return:
"""
print(f"Writing {name} to {destination}.")
glue_df = DynamicFrame.fromDF(out.repartition(1), context)
context.write_dynamic_frame.from_options(
frame=glue_df,
connection_type="s3",
connection_options={"path": destination},
format=destination_format)
...任何有关查找位置的帮助或想法都会有所帮助。
以防有人也遇到这个问题。它可能在数据被“收集”的那一刻发生。因此,在写出数据分区、获取计数等方面
该解决方案似乎与加载数据时的 Glue Dynamic Dataframe 有关。更改为 Spark Dataframe,你就完蛋了。