对 Spark RDD 执行收集操作时出错
Error while performing collect action on Spark RDD
- JDK1.8.0_141
- Spark 2.2(随 brew 安装)
我是 Spark 的新手,刚刚使用 brew 安装了它。在 iPython notebook 中,我创建了一个 RDD,它只是一个字符串列表。我 运行 对其进行了一些转换,一个是使列表中的所有项目成为一个元组的映射函数,另一个是 reduceByKey 函数。
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
wordCountsCollected = (wordsRDD
.map(lambda w: (w, 1))
.reduceByKey(lambda x,y: x+y)
.collect())
print(wordCountsCollected)
一切正常,直到我 运行 收集它。我得到这个回溯。
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-46-88b98f07e38a> in <module>()
4 wordCountsCollected = (wordsRDD
5 .map(lambda w: (w, 1))
----> 6 .reduceByKey(lambda x,y: x+y)
7 .collect())
8 print(wordCountsCollected)
/usr/local/opt/apache-spark/libexec/python/pyspark/rdd.py in collect(self)
807 """
808 with SCCallSiteSync(self.context) as css:
--> 809 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
810 return list(_load_from_socket(port, self._jrdd_deserializer))
811
/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 36.0 failed 1 times, most recent failure: Lost task 2.0 in stage 36.0 (TID 97, localhost, executor driver): java.lang.VerifyError: Bad instruction
Exception Details:
Location:
org/apache/spark/storage/ShuffleIndexBlockId.shuffleId()I @4: <illegal>
Reason:
Error exists in the bytecode
Bytecode:
0x0000000: 2ab4 0029 ec
at org.apache.spark.shuffle.IndexShuffleBlockResolver.getIndexFile(IndexShuffleBlockResolver.scala:59)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:141)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:164)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.GeneratedMethodAccessor53.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.VerifyError: Bad instruction
Exception Details:
Location:
org/apache/spark/storage/ShuffleIndexBlockId.shuffleId()I @4: <illegal>
Reason:
Error exists in the bytecode
Bytecode:
0x0000000: 2ab4 0029 ec
at org.apache.spark.shuffle.IndexShuffleBlockResolver.getIndexFile(IndexShuffleBlockResolver.scala:59)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:141)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:164)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
iPython 笔记本进口
%pylab inline
import pandas as pd
import seaborn as sns
pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
spark-shell
中的简单命令
Input >>> scala> spark.range(10).show
Output >>>
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
我解决了我的问题。我是 运行 iPython notebook 上的一个教程,它必须是 python2.7 上的 运行,我主要 运行 在我的电脑上。我应该被所有使用旧语法 "print x" 而不是 print(x) 的打印语句告知。无论如何,在 iPython 笔记本上使用 Apache Spark 的重要库 findspark 安装在 python3.5 上,但没有安装在 python2.7 上,并且在较旧的 python 版本上安装它之后,一切都很糟糕。故事的寓意,请始终检查您的 Python 版本 :)!谢谢大家的帮助。
- JDK1.8.0_141
- Spark 2.2(随 brew 安装)
我是 Spark 的新手,刚刚使用 brew 安装了它。在 iPython notebook 中,我创建了一个 RDD,它只是一个字符串列表。我 运行 对其进行了一些转换,一个是使列表中的所有项目成为一个元组的映射函数,另一个是 reduceByKey 函数。
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
wordCountsCollected = (wordsRDD
.map(lambda w: (w, 1))
.reduceByKey(lambda x,y: x+y)
.collect())
print(wordCountsCollected)
一切正常,直到我 运行 收集它。我得到这个回溯。
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-46-88b98f07e38a> in <module>()
4 wordCountsCollected = (wordsRDD
5 .map(lambda w: (w, 1))
----> 6 .reduceByKey(lambda x,y: x+y)
7 .collect())
8 print(wordCountsCollected)
/usr/local/opt/apache-spark/libexec/python/pyspark/rdd.py in collect(self)
807 """
808 with SCCallSiteSync(self.context) as css:
--> 809 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
810 return list(_load_from_socket(port, self._jrdd_deserializer))
811
/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 36.0 failed 1 times, most recent failure: Lost task 2.0 in stage 36.0 (TID 97, localhost, executor driver): java.lang.VerifyError: Bad instruction
Exception Details:
Location:
org/apache/spark/storage/ShuffleIndexBlockId.shuffleId()I @4: <illegal>
Reason:
Error exists in the bytecode
Bytecode:
0x0000000: 2ab4 0029 ec
at org.apache.spark.shuffle.IndexShuffleBlockResolver.getIndexFile(IndexShuffleBlockResolver.scala:59)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:141)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:164)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.GeneratedMethodAccessor53.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.VerifyError: Bad instruction
Exception Details:
Location:
org/apache/spark/storage/ShuffleIndexBlockId.shuffleId()I @4: <illegal>
Reason:
Error exists in the bytecode
Bytecode:
0x0000000: 2ab4 0029 ec
at org.apache.spark.shuffle.IndexShuffleBlockResolver.getIndexFile(IndexShuffleBlockResolver.scala:59)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:141)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:164)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
iPython 笔记本进口
%pylab inline
import pandas as pd
import seaborn as sns
pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
spark-shell
中的简单命令Input >>> scala> spark.range(10).show
Output >>>
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
我解决了我的问题。我是 运行 iPython notebook 上的一个教程,它必须是 python2.7 上的 运行,我主要 运行 在我的电脑上。我应该被所有使用旧语法 "print x" 而不是 print(x) 的打印语句告知。无论如何,在 iPython 笔记本上使用 Apache Spark 的重要库 findspark 安装在 python3.5 上,但没有安装在 python2.7 上,并且在较旧的 python 版本上安装它之后,一切都很糟糕。故事的寓意,请始终检查您的 Python 版本 :)!谢谢大家的帮助。