调用 o196.showString 时出错

An error occurred while calling o196.showString

我开始了解 spark 并想将一个列表(大约 1000 个条目)转换为 spark df。

不幸的是,我得到了标题中提到的错误。我无法真正弄清楚是什么导致了这个错误,如果有人能帮助我,我将不胜感激。 到目前为止,这是我的代码:

# Pyspark SQL library
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col,isnan, when, count
from pyspark.sql.functions import countDistinct


import findspark

findspark.init("/usr/local/spark/")

spark = SparkSession.builder \
   .master("local[*]") \
   .appName("project") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
comments = ["string1", "string2", "string3",...]

schema = StructType([StructField('Comments', StringType(), True),])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(comments)

# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show(5)

这是完整的错误消息:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-36-ff4bb233dd51> in <module>
      5 df = spark.createDataFrame(rdd,schema)
      6 print(df.schema)
----> 7 df.show(5)

/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    438         """
    439         if isinstance(truncate, bool) and truncate:
--> 440             print(self._jdf.showString(n, 20, vertical))
    441         else:
    442             print(self._jdf.showString(n, int(truncate), vertical))

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o244.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, jupyter-h11910677, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/spark/python/pyspark/sql/session.py", line 612, in prepare
    verify_func(obj)
  File "/usr/local/spark/python/pyspark/sql/types.py", line 1408, in verify
    verify_value(obj)
  File "/usr/local/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
    raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 'Free PDF version\r  \n\r  \n[https://canvas.umn.edu/courses/188156/files/16432145](https://canvas.umn.edu/courses/188156/files/16432145) \r  \n\r  \nPhysical copy for purchase\r  \n\r  \n[https://www.akpress.org/undoing-border-imperialism.html](https://www.akpress.org/undoing-border-imperialism.html) \r  \n\r  \nText to speech reader for audio form\r  \n\r  \n[https://www.naturalreaders.com/online/](https://www.naturalreaders.com/online/) \n\nSubreddit for further discussion r/AnarchoBooks' in type <class 'str'>

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:638)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:340)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:2008)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:2007)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:973)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
    at org.apache.spark.sql.Dataset.$anonfun$head(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3618)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/spark/python/pyspark/sql/session.py", line 612, in prepare
    verify_func(obj)
  File "/usr/local/spark/python/pyspark/sql/types.py", line 1408, in verify
    verify_value(obj)
  File "/usr/local/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
    raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 'Free PDF version\r  \n\r  \n[https://canvas.umn.edu/courses/188156/files/16432145](https://canvas.umn.edu/courses/188156/files/16432145) \r  \n\r  \nPhysical copy for purchase\r  \n\r  \n[https://www.akpress.org/undoing-border-imperialism.html](https://www.akpress.org/undoing-border-imperialism.html) \r  \n\r  \nText to speech reader for audio form\r  \n\r  \n[https://www.naturalreaders.com/online/](https://www.naturalreaders.com/online/) \n\nSubreddit for further discussion r/AnarchoBooks' in type <class 'str'>

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:638)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:340)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more

您需要创建类型为 RDD[Tuple[str]] 的 RDD,但在您的代码中,行:

rdd = spark.sparkContext.parallelize(comments)

returns RDD[str] 当您尝试将其转换为具有给定模式的数据帧时失败。

尝试将该行修改为:

rdd = spark.sparkContext.parallelize([(c,) for c in comments])

请注意,您实际上可以像这样将元组列表直接传递给 spark.createDataFrame

df = spark.createDataFrame([(c,) for c in comments], schema=schema)

df.show()
#+--------+
#|Comments|
#+--------+
#| string1|
#| string2|
#| string3|
#+--------+