pyspark 写入失败,出现 StackOverflowError

pyspark write failed with StackOverflowError

我计划在 AWS Glue 中将固定宽度转换为 Parquet, 我的数据有大约 1600 列和大约 3000 行。 好像当我尝试编写 spark 数据框(镶木地板)时,我遇到了“Whosebug”问题。
即使我执行 count()、show() 等操作时也会出现问题。 我尝试调用 cache(),repartition() 但仍然看到此错误。

如果我将列数减少到 500,代码就可以工作。

请帮忙

下面是我的代码

    data_df = spark.read.text(input_path) 

    schema_df = pd.read_json(schema_path)
    df = data_df

    for r in schema_df.itertuples():
        df = df.withColumn(
            str(r.name), df.value.substr(int(r.start), int(r.length))
        )
    df = df.drop("value")

    df.write.mode("overwrite").option("compression", "gzip").parquet(output_path) # FAILING HERE

下面的堆栈跟踪。

> 
2021-11-10 05:00:13,542 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
  File "/tmp/conv_fw_2_pq.py", line 148, in <module>
    partition_ts=parsed_args.partition_timestamp,
  File "/tmp/conv_fw_2_pq.py", line 125, in process_file
    df.write.mode("overwrite").option("compression", "gzip").parquet(output_path)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 839, in parquet
    self._jwrite.parquet(path)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
**py4j.protocol.Py4JJavaError: An error occurred while calling o7066.parquet.
: java.lang.WhosebugError**
    at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.catalyst.expressions.Expression.references(Expression.scala:88)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$references.apply(Expression.scala:88)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$references.apply(Expression.scala:88)
    at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.catalyst.expressions.Expression.references(Expression.scala:88)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$references.apply(QueryPlan.scala:45)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$references.apply(QueryPlan.scala:45)
    at scala.collection.immutable.Stream$$anonfun$flatMap.apply(Stream.scala:497)
    at scala.collection.immutable.Stream$$anonfun$flatMap.apply(Stream.scala:497)

Spark官方文档有如下描述: 这个方法(withColumn)在内部引入了一个投影。因此,多次调用它,例如,通过循环以添加多列可以生成可能导致性能问题甚至 WhosebugException 的大计划.为避免这种情况,请同时对多个列使用 **select()**

建议先构建select列表,再使用select方法构建新的dataframe