dataframe_obj.count() 上 AWS Glue 中的空指针异常

Nullpointerexception in AWS Glue on dataframe_obj.count()

美好的一天

我正在 AWS 上编写 Glue 作业来转换数据。在对两组数据进行连接后(生成大小约为 100MB 的数据帧),在检索数据帧上的计数时出现 Nullpointer 异常。使此错误难以追踪的原因是它只是偶尔发生 - 偶尔会成功。

错误是:

21/05/07 08:27:08 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
  File "/tmp/transform.py", line 398, in <module>
    main()
  File "/tmp/transform.py", line 355, in main
    extract_data(context, df1_trans, df2_trans)
  File "/tmp/transform.py", line 264, in extract_data
    joined_count = joined.count()
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 522, in count
    return int(self._jdf.count())
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o689.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 13.0 failed 4 times, most recent failure: Lost task 7.3 in stage 13.0 (TID 8795, 172.35.98.112, executor 5): java.lang.NullPointerException
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:207)
    at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
    at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:642)
    at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:148)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:131)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues.apply(ParquetFileFormat.scala:418)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues.apply(ParquetFileFormat.scala:352)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:177)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

有问题的代码:

    print("Applying join.")
    joined = (df1.join(df2, df1['ID'] == df2['ID'], how='inner')
              .select(df1["*"], df2["*"])
              .dropDuplicates(['ID', 'VAL']))
    extract_data joined_count = joined.count()    # Nullpointer exception here...
    print(f"Joined data: {joined_count}.") 
    write_out(context, joined, 'joined', "s3://<some_bucket>", "csv")

    # Retrieve data from joined data.
    tmp = (joined
           .withColumn('IDENT', joined['ID'])
           .withColumn('V1', joined['SOME_VALUE'])
           .withColumn('V2', joined['TIME'])
           .withColumn('V3', sf.lit('BLAH'))
           .withColumn('V4', sf.lit('3.14'))
           .select(['IDENT', 'V1', 'V2', 'V3', 'V4']))
    tmp_count = tmp.count()
    

并写出代码:

def write_out(context, out, name, destination, destination_format):
    """
    Writes out the data as a single file.

    :param context:
    :param out:
    :param name:
    :param destination:
    :param destination_format:
    :return:
    """
    print(f"Writing {name} to {destination}.")

    glue_df = DynamicFrame.fromDF(out.repartition(1), context)
    context.write_dynamic_frame.from_options(
        frame=glue_df,
        connection_type="s3",
        connection_options={"path": destination},
        format=destination_format)

...任何有关查找位置的帮助或想法都会有所帮助。

以防有人也遇到这个问题。它可能在数据被“收集”的那一刻发生。因此,在写出数据分区、获取计数等方面

该解决方案似乎与加载数据时的 Glue Dynamic Dataframe 有关。更改为 Spark Dataframe,你就完蛋了。