Spark streaming network_wordcount.py 不打印结果

Spark streaming network_wordcount.py does not print result

我启动 ncat 控制台,然后提交简单的示例应用程序 network_wordcount.py。

当我通过 ncat 控制台输入单词时,我看到以下输出:

...
15/01/20 16:55:19 INFO scheduler.JobScheduler: Added jobs for time 1421769319000 ms
15/01/20 16:55:19 INFO storage.MemoryStore: ensureFreeSpace(12) called with curMem=65052, maxMem=280248975
15/01/20 16:55:19 INFO storage.MemoryStore: Block input-0-1421769319200 stored as bytes in memory (estimated size 12.0 B, free 267.2 MB)
15/01/20 16:55:19 INFO storage.BlockManagerInfo: Added input-0-1421769319200 in memory on localhost:34754 (size: 12.0 B, free: 267.2 MB)
15/01/20 16:55:19 INFO storage.BlockManagerMaster: Updated info of block input-0-1421769319200 
15/01/20 16:55:19 WARN storage.BlockManager: Block input-0-1421769319200 replicated to only 0 peer(s) instead of 1 peers
15/01/20 16:55:19 INFO receiver.BlockGenerator: Pushed block input-0-1421769319200
...

所以 spark 似乎收到了一些东西,但它没有打印出预期的结果,例如 (hello,1)。

欢迎任何帮助。

此致,菲利克斯

PS: 我正在使用 CENTOS 我必须按如下方式调用 ncat 以便 Spark 接收任何数据:ncat -lkv6 9999

日志内容如下:

15/01/20 16:55:08 INFO spark.SecurityManager: Changing view acls to: root
15/01/20 16:55:08 INFO spark.SecurityManager: Changing modify acls to: root
15/01/20 16:55:08 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: 
Set(root); users with modify permissions: Set(root)
15/01/20 16:55:09 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/01/20 16:55:09 INFO Remoting: Starting remoting
15/01/20 16:55:09 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@srv-lab-t-734.zhaw.ch:54048]
15/01/20 16:55:09 INFO util.Utils: Successfully started service 'sparkDriver' on port 54048.
15/01/20 16:55:09 INFO spark.SparkEnv: Registering MapOutputTracker
15/01/20 16:55:09 INFO spark.SparkEnv: Registering BlockManagerMaster
15/01/20 16:55:10 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150120165510-b6c3
15/01/20 16:55:10 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB
15/01/20 16:55:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where 
applicable
15/01/20 16:55:10 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-51e3b95d-8b3d-4eed-8571-b38205b7ba9c
15/01/20 16:55:10 INFO spark.HttpServer: Starting HTTP Server
15/01/20 16:55:11 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/01/20 16:55:11 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:37603
15/01/20 16:55:11 INFO util.Utils: Successfully started service 'HTTP file server' on port 37603.
15/01/20 16:55:11 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/01/20 16:55:11 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/01/20 16:55:11 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
15/01/20 16:55:11 INFO ui.SparkUI: Started SparkUI at [this is not a link]h_t_t_p://srv-lab-t-734.zhaw.ch:4040
15/01/20 16:55:11 INFO util.Utils: Copying /root/tmp/spark/spark-1.2.0/examples/src/main/python/streaming/network_wordcount.py to 
/tmp/spark-c048d2f8-8ea8-4a13-a160-758c2875abec/network_wordcount.py
15/01/20 16:55:11 INFO spark.SparkContext: Added file 
file:/root/tmp/spark/spark-1.2.0/examples/src/main/python/streaming/network_wordcount.py at [this is not a 
link]http://160.85.30.108:37603/files/network_wordcount.py with timestamp 1421769311787
15/01/20 16:55:12 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: 
akka.tcp://sparkDriver@srv-lab-t-734.zhaw.ch:54048/user/HeartbeatReceiver
15/01/20 16:55:12 INFO netty.NettyBlockTransferService: Server created on 34754
15/01/20 16:55:12 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/01/20 16:55:12 INFO storage.BlockManagerMasterActor: Registering block manager localhost:34754 with 267.3 MB RAM, 
BlockManagerId(<driver>, localhost, 34754)
15/01/20 16:55:12 INFO storage.BlockManagerMaster: Registered BlockManager
15/01/20 16:55:12 INFO scheduler.ReceiverTracker: ReceiverTracker started
15/01/20 16:55:13 INFO dstream.ForEachDStream: metadataCleanupDelay = -1
15/01/20 16:55:13 INFO python.PythonTransformedDStream: metadataCleanupDelay = -1
15/01/20 16:55:13 INFO dstream.SocketInputDStream: metadataCleanupDelay = -1
15/01/20 16:55:13 INFO dstream.SocketInputDStream: Slide time = 1000 ms
15/01/20 16:55:13 INFO dstream.SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/01/20 16:55:13 INFO dstream.SocketInputDStream: Checkpoint interval = null
15/01/20 16:55:13 INFO dstream.SocketInputDStream: Remember duration = 1000 ms
15/01/20 16:55:13 INFO dstream.SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@162df93d
15/01/20 16:55:13 INFO python.PythonTransformedDStream: Slide time = 1000 ms
15/01/20 16:55:13 INFO python.PythonTransformedDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/01/20 16:55:13 INFO python.PythonTransformedDStream: Checkpoint interval = null
15/01/20 16:55:13 INFO python.PythonTransformedDStream: Remember duration = 1000 ms
15/01/20 16:55:13 INFO python.PythonTransformedDStream: Initialized and validated 
org.apache.spark.streaming.api.python.PythonTransformedDStream@24461f2b
15/01/20 16:55:13 INFO dstream.ForEachDStream: Slide time = 1000 ms
15/01/20 16:55:13 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/01/20 16:55:13 INFO dstream.ForEachDStream: Checkpoint interval = null
15/01/20 16:55:13 INFO dstream.ForEachDStream: Remember duration = 1000 ms
15/01/20 16:55:13 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@77a852a7
15/01/20 16:55:13 INFO scheduler.ReceiverTracker: Starting 1 receivers
15/01/20 16:55:13 INFO util.RecurringTimer: Started timer for JobGenerator at time 1421769314000
15/01/20 16:55:13 INFO scheduler.JobGenerator: Started JobGenerator at 1421769314000 ms
15/01/20 16:55:13 INFO scheduler.JobScheduler: Started JobScheduler
15/01/20 16:55:13 INFO spark.SparkContext: Starting job: start at NativeMethodAccessorImpl.java:-2
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Got job 0 (start at NativeMethodAccessorImpl.java:-2) with 1 output partitions 
(allowLocal=false)
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Final stage: Stage 0(start at NativeMethodAccessorImpl.java:-2)
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Missing parents: List()
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at start at NativeMethodAccessorImpl.java:-2), 
which has no missing parents
15/01/20 16:55:13 INFO storage.MemoryStore: ensureFreeSpace(35112) called with curMem=0, maxMem=280248975
15/01/20 16:55:13 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 34.3 KB, free 267.2 MB)
15/01/20 16:55:13 INFO storage.MemoryStore: ensureFreeSpace(19994) called with curMem=35112, maxMem=280248975
15/01/20 16:55:13 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.5 KB, free 267.2 MB)
15/01/20 16:55:13 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:34754 (size: 19.5 KB, free: 267.2 MB)
15/01/20 16:55:13 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0
15/01/20 16:55:13 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:838
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at start at 
NativeMethodAccessorImpl.java:-2)
15/01/20 16:55:13 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/01/20 16:55:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1937 bytes)
15/01/20 16:55:13 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
15/01/20 16:55:13 INFO executor.Executor: Fetching [this is not a link]h_t_t_p://160.85.30.108:37603/files/network_wordcount.py with 
timestamp 1421769311787
15/01/20 16:55:13 INFO util.Utils: Fetching [this is not a link]h_t_t_p://160.85.30.108:37603/files/network_wordcount.py to 
/tmp/fetchFileTemp2647961966239862063.tmp
15/01/20 16:55:13 INFO receiver.ReceiverSupervisorImpl: Registered receiver 0
15/01/20 16:55:13 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka://sparkDriver
15/01/20 16:55:13 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1421769314000
15/01/20 16:55:13 INFO receiver.BlockGenerator: Started BlockGenerator
15/01/20 16:55:13 INFO receiver.ReceiverSupervisorImpl: Starting receiver
15/01/20 16:55:13 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
15/01/20 16:55:13 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka://sparkDriver
15/01/20 16:55:13 INFO receiver.BlockGenerator: Started block pushing thread
15/01/20 16:55:13 INFO dstream.SocketReceiver: Connecting to localhost:9993
15/01/20 16:55:13 INFO dstream.SocketReceiver: Connected to localhost:9993
15/01/20 16:55:14 INFO scheduler.JobScheduler: Added jobs for time 1421769314000 ms
15/01/20 16:55:14 INFO scheduler.JobScheduler: Starting job streaming job 1421769314000 ms.0 from job set of time 1421769314000 ms
15/01/20 16:55:14 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:344
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Registering RDD 3 (call at 
/root/tmp/spark/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py:1206)
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Got job 1 (runJob at PythonRDD.scala:344) with 1 output partitions (allowLocal=true)
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Final stage: Stage 2(runJob at PythonRDD.scala:344)
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Missing parents: List()
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Submitting Stage 2 (PythonRDD[7] at RDD at PythonRDD.scala:43), which has no missing parents
15/01/20 16:55:14 INFO storage.MemoryStore: ensureFreeSpace(5696) called with curMem=55106, maxMem=280248975
15/01/20 16:55:14 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.6 KB, free 267.2 MB)
15/01/20 16:55:14 INFO storage.MemoryStore: ensureFreeSpace(4250) called with curMem=60802, maxMem=280248975
15/01/20 16:55:14 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.2 KB, free 267.2 MB)
15/01/20 16:55:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:34754 (size: 4.2 KB, free: 267.2 MB)
15/01/20 16:55:14 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0
15/01/20 16:55:14 INFO spark.SparkContext: Created broadcast 1 from getCallSite at DStream.scala:294
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 2 (PythonRDD[7] at RDD at PythonRDD.scala:43)
15/01/20 16:55:14 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
15/01/20 16:55:15 INFO scheduler.JobScheduler: Added jobs for time 1421769315000 ms
15/01/20 16:55:16 INFO scheduler.JobScheduler: Added jobs for time 1421769316000 ms
15/01/20 16:55:17 INFO scheduler.JobScheduler: Added jobs for time 1421769317000 ms
15/01/20 16:55:18 INFO scheduler.JobScheduler: Added jobs for time 1421769318000 ms
15/01/20 16:55:19 INFO scheduler.JobScheduler: Added jobs for time 1421769319000 ms
15/01/20 16:55:19 INFO storage.MemoryStore: ensureFreeSpace(12) called with curMem=65052, maxMem=280248975
15/01/20 16:55:19 INFO storage.MemoryStore: Block input-0-1421769319200 stored as bytes in memory (estimated size 12.0 B, free 267.2 MB)
15/01/20 16:55:19 INFO storage.BlockManagerInfo: Added input-0-1421769319200 in memory on localhost:34754 (size: 12.0 B, free: 267.2 MB)
15/01/20 16:55:19 INFO storage.BlockManagerMaster: Updated info of block input-0-1421769319200
15/01/20 16:55:19 WARN storage.BlockManager: Block input-0-1421769319200 replicated to only 0 peer(s) instead of 1 peers
15/01/20 16:55:19 INFO receiver.BlockGenerator: Pushed block input-0-1421769319200
15/01/20 16:55:20 INFO scheduler.JobScheduler: Added jobs for time 1421769320000 ms
15/01/20 16:55:21 INFO scheduler.JobScheduler: Added jobs for time 1421769321000 ms
15/01/20 16:55:22 INFO scheduler.JobScheduler: Added jobs for time 1421769322000 ms

我认为你应该在提交申请时指定更多的执行人。例如:

spark-submit --master local[4] your_file.py

来自 Learning Spark 第 10 章:

Do not run Spark Streaming programs locally with master configured as local or local[1]. This allocates only one CPU for tasks and if a receiver is running on it, there is no resource left to process the received data. Use at least local[2] to have more cores.