调用 o196.showString 时出错
An error occurred while calling o196.showString
我开始了解 spark 并想将一个列表(大约 1000 个条目)转换为 spark df。
不幸的是,我得到了标题中提到的错误。我无法真正弄清楚是什么导致了这个错误,如果有人能帮助我,我将不胜感激。
到目前为止,这是我的代码:
# Pyspark SQL library
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col,isnan, when, count
from pyspark.sql.functions import countDistinct
import findspark
findspark.init("/usr/local/spark/")
spark = SparkSession.builder \
.master("local[*]") \
.appName("project") \
.config("spark.executor.memory", "1gb") \
.getOrCreate()
comments = ["string1", "string2", "string3",...]
schema = StructType([StructField('Comments', StringType(), True),])
# Convert list to RDD
rdd = spark.sparkContext.parallelize(comments)
# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show(5)
这是完整的错误消息:
Py4JJavaError Traceback (most recent call last)
<ipython-input-36-ff4bb233dd51> in <module>
5 df = spark.createDataFrame(rdd,schema)
6 print(df.schema)
----> 7 df.show(5)
/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
438 """
439 if isinstance(truncate, bool) and truncate:
--> 440 print(self._jdf.showString(n, 20, vertical))
441 else:
442 print(self._jdf.showString(n, int(truncate), vertical))
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o244.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, jupyter-h11910677, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
File "/usr/local/spark/python/pyspark/sql/session.py", line 612, in prepare
verify_func(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1408, in verify
verify_value(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 'Free PDF version\r \n\r \n[https://canvas.umn.edu/courses/188156/files/16432145](https://canvas.umn.edu/courses/188156/files/16432145) \r \n\r \nPhysical copy for purchase\r \n\r \n[https://www.akpress.org/undoing-border-imperialism.html](https://www.akpress.org/undoing-border-imperialism.html) \r \n\r \nText to speech reader for audio form\r \n\r \n[https://www.naturalreaders.com/online/](https://www.naturalreaders.com/online/) \n\nSubreddit for further discussion r/AnarchoBooks' in type <class 'str'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at org.apache.spark.sql.Dataset.$anonfun$head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
File "/usr/local/spark/python/pyspark/sql/session.py", line 612, in prepare
verify_func(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1408, in verify
verify_value(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 'Free PDF version\r \n\r \n[https://canvas.umn.edu/courses/188156/files/16432145](https://canvas.umn.edu/courses/188156/files/16432145) \r \n\r \nPhysical copy for purchase\r \n\r \n[https://www.akpress.org/undoing-border-imperialism.html](https://www.akpress.org/undoing-border-imperialism.html) \r \n\r \nText to speech reader for audio form\r \n\r \n[https://www.naturalreaders.com/online/](https://www.naturalreaders.com/online/) \n\nSubreddit for further discussion r/AnarchoBooks' in type <class 'str'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
您需要创建类型为 RDD[Tuple[str]]
的 RDD,但在您的代码中,行:
rdd = spark.sparkContext.parallelize(comments)
returns RDD[str]
当您尝试将其转换为具有给定模式的数据帧时失败。
尝试将该行修改为:
rdd = spark.sparkContext.parallelize([(c,) for c in comments])
请注意,您实际上可以像这样将元组列表直接传递给 spark.createDataFrame
:
df = spark.createDataFrame([(c,) for c in comments], schema=schema)
df.show()
#+--------+
#|Comments|
#+--------+
#| string1|
#| string2|
#| string3|
#+--------+
我开始了解 spark 并想将一个列表(大约 1000 个条目)转换为 spark df。
不幸的是,我得到了标题中提到的错误。我无法真正弄清楚是什么导致了这个错误,如果有人能帮助我,我将不胜感激。 到目前为止,这是我的代码:
# Pyspark SQL library
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col,isnan, when, count
from pyspark.sql.functions import countDistinct
import findspark
findspark.init("/usr/local/spark/")
spark = SparkSession.builder \
.master("local[*]") \
.appName("project") \
.config("spark.executor.memory", "1gb") \
.getOrCreate()
comments = ["string1", "string2", "string3",...]
schema = StructType([StructField('Comments', StringType(), True),])
# Convert list to RDD
rdd = spark.sparkContext.parallelize(comments)
# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show(5)
这是完整的错误消息:
Py4JJavaError Traceback (most recent call last)
<ipython-input-36-ff4bb233dd51> in <module>
5 df = spark.createDataFrame(rdd,schema)
6 print(df.schema)
----> 7 df.show(5)
/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
438 """
439 if isinstance(truncate, bool) and truncate:
--> 440 print(self._jdf.showString(n, 20, vertical))
441 else:
442 print(self._jdf.showString(n, int(truncate), vertical))
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o244.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, jupyter-h11910677, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
File "/usr/local/spark/python/pyspark/sql/session.py", line 612, in prepare
verify_func(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1408, in verify
verify_value(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 'Free PDF version\r \n\r \n[https://canvas.umn.edu/courses/188156/files/16432145](https://canvas.umn.edu/courses/188156/files/16432145) \r \n\r \nPhysical copy for purchase\r \n\r \n[https://www.akpress.org/undoing-border-imperialism.html](https://www.akpress.org/undoing-border-imperialism.html) \r \n\r \nText to speech reader for audio form\r \n\r \n[https://www.naturalreaders.com/online/](https://www.naturalreaders.com/online/) \n\nSubreddit for further discussion r/AnarchoBooks' in type <class 'str'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at org.apache.spark.sql.Dataset.$anonfun$head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
File "/usr/local/spark/python/pyspark/sql/session.py", line 612, in prepare
verify_func(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1408, in verify
verify_value(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 'Free PDF version\r \n\r \n[https://canvas.umn.edu/courses/188156/files/16432145](https://canvas.umn.edu/courses/188156/files/16432145) \r \n\r \nPhysical copy for purchase\r \n\r \n[https://www.akpress.org/undoing-border-imperialism.html](https://www.akpress.org/undoing-border-imperialism.html) \r \n\r \nText to speech reader for audio form\r \n\r \n[https://www.naturalreaders.com/online/](https://www.naturalreaders.com/online/) \n\nSubreddit for further discussion r/AnarchoBooks' in type <class 'str'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
您需要创建类型为 RDD[Tuple[str]]
的 RDD,但在您的代码中,行:
rdd = spark.sparkContext.parallelize(comments)
returns RDD[str]
当您尝试将其转换为具有给定模式的数据帧时失败。
尝试将该行修改为:
rdd = spark.sparkContext.parallelize([(c,) for c in comments])
请注意,您实际上可以像这样将元组列表直接传递给 spark.createDataFrame
:
df = spark.createDataFrame([(c,) for c in comments], schema=schema)
df.show()
#+--------+
#|Comments|
#+--------+
#| string1|
#| string2|
#| string3|
#+--------+