将 PySpark 数据帧写入 Parquet 文件时出现 Py4JJavaError

Py4JJavaError while writing PySpark dataframe to Parquet file

总而言之,我有 100k 行数据作为 csv 文件。这是它的示例:

ID,Name,Surname,Birthdate,Details
0, Agjqyru, Qtltzu, 1923-02-23, "{City=Neftchala, Gender=male, Education=collage}"
1, Zkaczi, Gvuvwwle, 2002-02-28, "{City=Mingachevir, Gender=female, Education=doctor}"
2, Hkbfros, Llmufk, 1948-02-29, "{City=Ujar, Gender=male, Education=collage}"
3, Dddtulkeo, Fdnccbp, 1903-07-01, "{City=Dashkasan, Gender=female, Education=bachelor}"
4, Wssqm, Kzekihhqjmrd, 1935-05-10, "{City=Baku, Gender=female, Education=uneducated}"
5, Iurg, Nglzxwu, 1915-04-02, "{City=Khojavend, Gender=male, Education=school}"

我的任务是将详细信息列分成 3 个单独的列,并将数据保存为 parquet 文件。到目前为止,这是我的尝试:

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

sparkContext = SparkContext.getOrCreate()
sqlContext = SQLContext(sparkContext).getOrCreate(sparkContext)
sparkSession = SparkSession.builder.master("local").appName("parquet_example").getOrCreate()

# read csv file 
__DATA = spark.read.csv('data.csv', header = True)

# divide column details into 3
__DATA = __DATA.withColumn("Details", split(regexp_replace(regexp_replace((regexp_replace("Details",'\{|}',"")),'\:',','),'\"|"',""),','))\
.withColumn("City", element_at("Details",1))\
.withColumn("Gender", element_at("Details",2))\
.withColumn("Education",element_at("Details",3)).drop("Details").rdd

# clean data
__DATA = __DATA.map(lambda x : (x[0], x[1], x[2], x[3][x[3].index("=")+1:], x[4][x[4].index("=")+1:], x[5][x[5].index("=")+1:]))

dfSchema = StructType([       
    StructField('Name', StringType(), True),
    StructField('Surname', StringType(), True),
    StructField('Birthdate', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Gender', StringType(), True),
    StructField('Education', StringType(), True)
])

__DATA = __DATA.toDF(dfSchema)

但是,每当我尝试使用以下代码将其保存到 parquet 文件时,我都会收到 Py4JJavaError 错误:

__DATA.write.mode("overwrite").parquet("people.parquet") # write it to parquet

那么,我错过了什么?

这是完整的错误堆栈:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-55-67be3fc6fc33> in <module>
----> 1 __DATA.write.mode("overwrite").parquet("people.parquet") # write it to parquet

C:\<user>\spark-3.0.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py in parquet(self, path, mode, partitionBy, compression)
    934             self.partitionBy(partitionBy)
    935         self._set_opts(compression=compression)
--> 936         self._jwrite.parquet(path)
    937 
    938     @since(1.6)

C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

C:\<user>\spark-3.0.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
    129     def deco(*a, **kw):
    130         try:
--> 131             return f(*a, **kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o1290.parquet.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:944)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:829)
    at sun.reflect.GeneratedMethodAccessor131.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 35.0 failed 1 times, most recent failure: Lost task 1.0 in stage 35.0 (TID 60, bkhddev01, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-53-6c5f08998e2a>", line 12, in <lambda>
ValueError: substring not found

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:638)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write(FileFormatWriter.scala:205)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:1972)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:1971)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:950)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
    ... 32 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-53-6c5f08998e2a>", line 12, in <lambda>
ValueError: substring not found

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:638)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write(FileFormatWriter.scala:205)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

错误是因为下面的代码。

x[3][x[3].index("=")+1:]

在索引 3 处,您有 Birthdate 字段。您应该在索引 3 之后开始切片操作,您将从那里清理 Details 列下的字段。

您可以只在 DF 级别进行清理操作,而不是将 DF 转换为 RDD。

__DATA = __DATA.withColumn("Details", split(regexp_replace("Details",'\{|}',""),','))\
.withColumn("City", split(col("Details")[0], '=')[1]) \
.withColumn("Gender", split(col("Details")[1], '=')[1])\
.withColumn("Education",split(col("Details")[2], '=')[1]).drop("Details")