Pyspark 3.1.2 问题 "expected zero arguments for construction of ClassDict"

Pyspark 3.1.2 problem "expected zero arguments for construction of ClassDict"

我已经设置了 Spark 集群版本 3.1.2。我正在为 Spark 使用 Python API。我在数据框中加载了一些 JSON 数据。我必须解析一个嵌套列 (ADSZ_2),它看起来像以下格式

ADSZ_2: [{key,value}, {key,value}, {key,value}]

我为此开发了以下代码

...
...
def parseCell(array_data):
    final_list = []
    if array_data is not None:
        for record in array_data:
            record_dict = record.asDict()
            if "string1" in record_dict:
                string1 = remover(record_dict["string1"])
                record_dict["string1"] = string1
            if "string2" in record_dict:
                string2 = remover(record_dict["string2"])
                record_dict["string2"] = string2
            final_list.append(Row(**record_dict))
        return final_list
        
        
        
df = spark.read.load(data_path, multiline="false", format="json")
udf_fun = udf(lambda row: parseCell(row), ArrayType(StructType()))
df.withColumn("new-name", udf_fun(col("ADSZ_2"))).show()
...

当我 运行 以上代码时,出现以下异常

21/10/07 09:09:07 ERROR Executor: Exception in task 0.0 in stage 116.0 (TID 132)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate(BatchEvalPythonExec.scala:94)
    at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:755)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:898)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/10/07 09:09:07 WARN TaskSetManager: Lost task 0.0 in stage 116.0 (TID 132) (hadoop-master.local executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate(BatchEvalPythonExec.scala:94)

我已经尝试了 中给出的各种选项,但这些解决方案中的 None 有效。问题出在哪里?

有没有更好的方法来完成这项工作?

我将提出一个替代解决方案,您可以使用数据帧的 rdd 转换行。这是一个独立的示例,我已尝试将其应用到您的数据中:

import pyspark.sql.functions as F
from pyspark.sql import Row
import pyspark.sql.types as T


df = spark.createDataFrame([Row(ADSZ_2=[{"string1": "a", "string2": "b"}, {"string1": "c", "string2": "d"}]),
                            Row(ADSZ_2=[{"string1": "e", "string2": "f"}, {"string1": "g", "not_taken" : "1", "string2": "h"}]),
                            Row(ADSZ_2=[{"string1": "i", "string2": "j"}, {"string1": "k", "not_taken" : "1", "string2": "l"}]),
                            Row(ADSZ_2=None),
                            Row(ADSZ_2=[None, {"string1": "m", "not_taken" : "1", "string2": "n"}])])
df.show(20, False)
def parseCell(row):
    final_list = []
    l = row["ADSZ_2"]
    if l:
      for record_dict in l:
        if record_dict:
          new_dict = {key : val for key,val in record_dict.items() if key in ["string1", "string2"]}
          if new_dict:
            final_list.append(Row(**new_dict))
    return final_list
  
df_rdd = df.rdd.flatMap(lambda row: parseCell(row))
new_df = spark.createDataFrame(df_rdd)
new_df.show()

输出:

+----------------------------------------------------------------------------+
|ADSZ_2                                                                      |
+----------------------------------------------------------------------------+
|[{string1 -> a, string2 -> b}, {string1 -> c, string2 -> d}]                |
|[{string1 -> e, string2 -> f}, {not_taken -> 1, string1 -> g, string2 -> h}]|
|[{string1 -> i, string2 -> j}, {not_taken -> 1, string1 -> k, string2 -> l}]|
|null                                                                        |
|[null, {not_taken -> 1, string1 -> m, string2 -> n}]                        |
+----------------------------------------------------------------------------+

+-------+-------+
|string1|string2|
+-------+-------+
|      a|      b|
|      c|      d|
|      e|      f|
|      g|      h|
|      i|      j|
|      k|      l|
|      m|      n|
+-------+-------+

您需要确保在 parseCell 中生成的所有行都包含正确的列数。