Pydev Spark 安装

Pydev Spark installation

我正在尝试 运行 通过 Lilicpse IDE 用 python 编写的 Spark 程序 IDE。

平台是windows10.

我已经使用 Anaconda 安装了 python。

那我安装了Scala

之后我安装了sbt。但是,当我尝试从命令行使用 运行 sbt 命令时,它不起作用。

之后我下载了 spark tar 并解压。

下面是我定义的环境变量。

SPARK_HOME = C:\Users\romit.srivastava\spark-1.6.1-bin-hadoop2.6
SPARK_CONF = C:\Users\romit.srivastava\spark-1.6.1-bin-hadoop2.6\conf
SPARK_IP = 10.11.246.153
PYSPARK_SUBMIT_ARGS =  --master local[*] --queue PyDevSpark1.5.2 pyspark-shell

我可以导入 pyspark 模块。

现在我创建了一个简单的字数统计程序

创建于 2016 年 5 月 12 日

@作者:romit.srivastava '''

# ADVICE: With PyDev, take care about unused imports (and also unused variables),
# please comment them all, otherwise you will get any errors at the execution.
# Note that even the trick like the directives @PydevCodeAnalysisIgnore and
# @UnusedImport will never solve that issue.

# Imports the PySpark libraries
from pyspark import SparkConf, SparkContext

# The 'os' library allows us to read the environment variable SPARK_HOME defined in the IDE environment
import os

# Configure the Spark context to give a name to the application
sparkConf = SparkConf().setAppName("MyWordCounts")
sc = SparkContext(conf = sparkConf)

# The text file containing the words to count (this is the Spark README file)
textFile = sc.textFile("README.md")

# The code for counting the words (note that the execution mode is lazy)
# Uses the same paradigm Map and Reduce of Hadoop, but fully in memory
wordCounts = textFile.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)

# Executes the DAG (Directed Acyclic Graph) for counting and collecting the result
for wc in wordCounts.collect(): 
    print(wc)

现在当我 运行 以下是我遇到的错误:

日志;

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/05/12 15:47:47 INFO SparkContext: Running Spark version 1.6.1
16/05/12 15:47:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/12 15:47:47 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363)
    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
    at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
    at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
    at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
    at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
    at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)
    at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName.apply(Utils.scala:2160)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName.apply(Utils.scala:2160)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2160)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:322)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:214)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
16/05/12 15:47:47 INFO SecurityManager: Changing view acls to: romit.srivastava
16/05/12 15:47:47 INFO SecurityManager: Changing modify acls to: romit.srivastava
16/05/12 15:47:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(romit.srivastava); users with modify permissions: Set(romit.srivastava)
16/05/12 15:47:48 INFO Utils: Successfully started service 'sparkDriver' on port 62512.
16/05/12 15:47:48 INFO Slf4jLogger: Slf4jLogger started
16/05/12 15:47:48 INFO Remoting: Starting remoting
16/05/12 15:47:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.11.246.153:62525]
16/05/12 15:47:48 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 62525.
16/05/12 15:47:48 INFO SparkEnv: Registering MapOutputTracker
16/05/12 15:47:48 INFO SparkEnv: Registering BlockManagerMaster
16/05/12 15:47:48 INFO DiskBlockManager: Created local directory at C:\Users\romit.srivastava\AppData\Local\Temp\blockmgr-31953c2b-3d20-4bfa-a152-673ff000b58c
16/05/12 15:47:48 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/05/12 15:47:48 INFO SparkEnv: Registering OutputCommitCoordinator
16/05/12 15:47:49 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/05/12 15:47:49 INFO SparkUI: Started SparkUI at http://10.11.246.153:4040
16/05/12 15:47:49 INFO Executor: Starting executor ID driver on host localhost
16/05/12 15:47:49 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 62544.
16/05/12 15:47:49 INFO NettyBlockTransferService: Server created on 62544
16/05/12 15:47:49 INFO BlockManagerMaster: Trying to register BlockManager
16/05/12 15:47:49 INFO BlockManagerMasterEndpoint: Registering block manager localhost:62544 with 511.1 MB RAM, BlockManagerId(driver, localhost, 62544)
16/05/12 15:47:49 INFO BlockManagerMaster: Registered BlockManager
16/05/12 15:47:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)
16/05/12 15:47:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)
16/05/12 15:47:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:62544 (size: 13.9 KB, free: 511.1 MB)
16/05/12 15:47:50 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
16/05/12 15:47:50 INFO FileInputFormat: Total input paths to process : 1
16/05/12 15:47:50 INFO SparkContext: Starting job: collect at C:\Users\romit.srivastava\My Documents\LiClipse Workspace\TestProject1\testspark.py:33
16/05/12 15:47:50 INFO DAGScheduler: Registering RDD 3 (reduceByKey at C:\Users\romit.srivastava\My Documents\LiClipse Workspace\TestProject1\testspark.py:30)
16/05/12 15:47:50 INFO DAGScheduler: Got job 0 (collect at C:\Users\romit.srivastava\My Documents\LiClipse Workspace\TestProject1\testspark.py:33) with 2 output partitions
16/05/12 15:47:50 INFO DAGScheduler: Final stage: ResultStage 1 (collect at C:\Users\romit.srivastava\My Documents\LiClipse Workspace\TestProject1\testspark.py:33)
16/05/12 15:47:50 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/05/12 15:47:50 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/05/12 15:47:50 INFO DAGScheduler: Submitting ShuffleMapStage 0 (PairwiseRDD[3] at reduceByKey at C:\Users\romit.srivastava\My Documents\LiClipse Workspace\TestProject1\testspark.py:30), which has no missing parents
16/05/12 15:47:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.3 KB, free 149.6 KB)
16/05/12 15:47:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.4 KB, free 155.1 KB)
16/05/12 15:47:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:62544 (size: 5.4 KB, free: 511.1 MB)
16/05/12 15:47:50 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/05/12 15:47:50 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (PairwiseRDD[3] at reduceByKey at C:\Users\romit.srivastava\My Documents\LiClipse Workspace\TestProject1\testspark.py:30)
16/05/12 15:47:50 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/05/12 15:47:50 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2177 bytes)
16/05/12 15:47:50 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 2177 bytes)
16/05/12 15:47:50 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/05/12 15:47:50 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/05/12 15:47:50 INFO HadoopRDD: Input split: file:/C:/Users/romit.srivastava/My Documents/LiClipse Workspace/TestProject1/README.md:1679+1680
16/05/12 15:47:50 INFO HadoopRDD: Input split: file:/C:/Users/romit.srivastava/My Documents/LiClipse Workspace/TestProject1/README.md:0+1679
16/05/12 15:47:50 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/05/12 15:47:50 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/05/12 15:47:50 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/05/12 15:47:50 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/05/12 15:47:50 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
Failed to import the site module
Traceback (most recent call last):
  File "C:\Users\romit.srivastava\Anaconda3\lib\site.py", line 72, in <module>
    import os
  File "C:\Users\romit.srivastava\Anaconda3\lib\os.py", line 666, in <module>
    from _collections_abc import MutableMapping
  File "C:\Users\romit.srivastava\Anaconda3\lib\_collections_abc.py", line 56
    async def _coro(): pass
            ^
SyntaxError: invalid syntax
16/05/12 15:48:00 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.spark.SparkException: Python worker did not connect back in time
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:136)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:134)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:101)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: Accept timed out
    at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
    at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:131)
    ... 16 more
16/05/12 15:48:00 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, localhost): org.apache.spark.SparkException: Python worker did not connect back in time
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:136)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:134)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:101)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: Accept timed out
    at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
    at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:131)
    ... 16 more

16/05/12 15:48:00 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job
16/05/12 15:48:00 INFO TaskSchedulerImpl: Cancelling stage 0
16/05/12 15:48:00 INFO TaskSchedulerImpl: Stage 0 was cancelled
16/05/12 15:48:00 INFO DAGScheduler: ShuffleMapStage 0 (reduceByKey at C:\Users\romit.srivastava\My Documents\LiClipse Workspace\TestProject1\testspark.py:30) failed in 10.202 s
16/05/12 15:48:00 INFO Executor: Executor is trying to kill task 0.0 in stage 0.0 (TID 0)
16/05/12 15:48:00 INFO DAGScheduler: Job 0 failed: collect at C:\Users\romit.srivastava\My Documents\LiClipse Workspace\TestProject1\testspark.py:33, took 10.268549 s
Traceback (most recent call last):
  File "C:\Users\romit.srivastava\My Documents\LiClipse Workspace\TestProject1\testspark.py", line 33, in <module>
    for wc in wordCounts.collect(): 
  File "C:\Users\romit.srivastava\spark-1.6.1-bin-hadoop2.6\python\pyspark\rdd.py", line 771, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "C:\Users\romit.srivastava\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
  File "C:\Users\romit.srivastava\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): org.apache.spark.SparkException: Python worker did not connect back in time

    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:136)

    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)

    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:134)

    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:101)

    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)

    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)

    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)

    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)

    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

    at org.apache.spark.scheduler.Task.run(Task.scala:89)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

    at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.SocketTimeoutException: Accept timed out

    at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)

    at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)

    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)

    at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)

    at java.net.ServerSocket.implAccept(ServerSocket.java:545)

    at java.net.ServerSocket.accept(ServerSocket.java:513)

    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:131)

    ... 16 more


Driver stacktrace:

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1419)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1418)

    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)

    at scala.Option.foreach(Option.scala:236)

    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)

    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)

    at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:927)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

    at org.apache.spark.rdd.RDD.collect(RDD.scala:926)

    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)

    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)

    at py4j.Gateway.invoke(Gateway.java:259)

    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)

    at py4j.commands.CallCommand.execute(CallCommand.java:79)

    at py4j.GatewayConnection.run(GatewayConnection.java:209)

    at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Python worker did not connect back in time

    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:136)

    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)

    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:134)

    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:101)

    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)

    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)

    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)

    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)

    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

    at org.apache.spark.scheduler.Task.run(Task.scala:89)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

    ... 1 more

Caused by: java.net.SocketTimeoutException: Accept timed out

    at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)

    at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)

    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)

    at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)

    at java.net.ServerSocket.implAccept(ServerSocket.java:545)

    at java.net.ServerSocket.accept(ServerSocket.java:513)

    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:131)

    ... 16 more


Failed to import the site module
Traceback (most recent call last):
  File "C:\Users\romit.srivastava\Anaconda3\lib\site.py", line 72, in <module>
    import os
  File "C:\Users\romit.srivastava\Anaconda3\lib\os.py", line 666, in <module>
    from _collections_abc import MutableMapping
  File "C:\Users\romit.srivastava\Anaconda3\lib\_collections_abc.py", line 56
    async def _coro(): pass
            ^
SyntaxError: invalid syntax
16/05/12 15:48:00 INFO SparkContext: Invoking stop() from shutdown hook
16/05/12 15:48:00 INFO SparkUI: Stopped Spark web UI at http://10.11.246.153:4040
16/05/12 15:48:00 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/05/12 15:48:00 INFO MemoryStore: MemoryStore cleared
16/05/12 15:48:00 INFO BlockManager: BlockManager stopped
16/05/12 15:48:00 INFO BlockManagerMaster: BlockManagerMaster stopped
16/05/12 15:48:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/05/12 15:48:00 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/05/12 15:48:00 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/05/12 15:48:00 INFO SparkContext: Successfully stopped SparkContext
16/05/12 15:48:00 INFO ShutdownHookManager: Shutdown hook called
16/05/12 15:48:00 INFO ShutdownHookManager: Deleting directory C:\Users\romit.srivastava\AppData\Local\Temp\spark-0f7da00b-c7fc-40c2-8340-5d0d43c2ff6c\pyspark-f64bcb90-0530-4008-bb15-de92f044bd63
16/05/12 15:48:00 INFO ShutdownHookManager: Deleting directory C:\Users\romit.srivastava\AppData\Local\Temp\spark-0f7da00b-c7fc-40c2-8340-5d0d43c2ff6c
SUCCESS: The process with PID 3304 (child process of PID 2208) has been terminated.
SUCCESS: The process with PID 2208 (child process of PID 12832) has been terminated.
SUCCESS: The process with PID 12832 (child process of PID 12268) has been terminated.

终于可以运行了。

解决方法是:

首先我设置了HADOOP_HOME变量。

os.environ['HADOOP_HOME'] ="C:\Users\romit.srivastava\hadoop-2.6.0"
sys.path.append("C:\Users\romit.srivastava\hadoop-2.6.0")

之后我定义了 PYSPARK_HOME 变量:

os.environ["PYSPARK_PYTHON"] = "C:\Users\romit.srivastava\Anaconda3\python.exe"
sys.path.append("C:\Users\romit.srivastava\Anaconda3\python.exe")