如何在数据源 运行 输出时停止 Spark Streaming
How to stop spark streaming when the data source has run out
我有一个 Spark Streaming 作业,每 5 秒从 Kafka 读取一次,对传入数据进行一些转换,然后写入文件系统。
这并不真的需要成为一个流媒体作业,实际上,我只想 运行 每天一次将消息传送到文件系统。不过我不确定如何停止这项工作。
如果我将超时传递给 streamingContext.awaitTermination,它不会停止进程,它所做的只是导致进程在需要迭代流时产生错误(请参阅下面的错误)
完成我想做的事情的最佳方法是什么
这是针对 Python
上的 Spark 1.6
编辑:
感谢@marios,解决方案是这样的:
ssc.start()
ssc.awaitTermination(10)
ssc.stop()
在停止前 运行 脚本运行十秒钟。
简化代码:
conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
stream = KafkaUtils.createStream(
ssc,
kafkaParams["zookeeper.connect"],
"vehicle-data-importer",
topicPartitions,
kafkaParams)
stream.saveAsTextFiles('stream-output/kafka-vehicle-data')
ssc.start()
ssc.awaitTermination(10)
错误:
16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200
16/01/29 15:05:45 ERROR JobScheduler: Error generating jobs for time 1454097945000 ms
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:232)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)
at com.sun.proxy.$Proxy14.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:230)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:114)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
16/01/29 15:05:45 INFO MemoryStore: Block input-0-1454097944800 stored as bytes in memory (estimated size 3.0 MB, free 466.1 MB)
16/01/29 15:05:45 INFO BlockManagerInfo: Added input-0-1454097944800 in memory on localhost:58960 (size: 3.0 MB, free: 45.1 MB)
似乎正确的调用方法是awaitTerminationOrTimeout(self, timeout)。
我不确定它是否也停止了流上下文。所以也许你可以在超时结束后立即调用 ssc.stop()。
ssc.start()
ssc.awaitTerminationOrTimeout(10)
ssc.stop()
注意:看看类似的问题。
试试 Kafka "consumer.timeout.ms" 参数,它会优雅地结束 KafkaReceiver。(来自 kafka 0.8 configuration)
Throw a timeout exception to the consumer if no message is available
for consumption after the specified interval
HDF = KafkaUtils.createStream(ssc, topics={strLoc : 1}, kafkaParams={"consumer.timeout.ms":"20000" }, zkQuorum='xxx:2181', groupId='xxx-consumer-group')
您将无法在当前流式执行中接收任何新的 kafka 消息,并且总是得到空的 RDD。
并检查 DSteam.foreachRDD(func) 中空 RDD 的数量。如果您连续获得空 RDD,则终止流式执行。
这里的问题从 spark 1.6 开始,在与 Dstream 处理线程相同的线程中调用 ssc.stop 会造成死锁,因为停止会等待轮询器线程完成创建 deadlock.sp 停止调用来自另一个线程
我有一个 Spark Streaming 作业,每 5 秒从 Kafka 读取一次,对传入数据进行一些转换,然后写入文件系统。
这并不真的需要成为一个流媒体作业,实际上,我只想 运行 每天一次将消息传送到文件系统。不过我不确定如何停止这项工作。
如果我将超时传递给 streamingContext.awaitTermination,它不会停止进程,它所做的只是导致进程在需要迭代流时产生错误(请参阅下面的错误)
完成我想做的事情的最佳方法是什么
这是针对 Python
上的 Spark 1.6编辑:
感谢@marios,解决方案是这样的:
ssc.start()
ssc.awaitTermination(10)
ssc.stop()
在停止前 运行 脚本运行十秒钟。
简化代码:
conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
stream = KafkaUtils.createStream(
ssc,
kafkaParams["zookeeper.connect"],
"vehicle-data-importer",
topicPartitions,
kafkaParams)
stream.saveAsTextFiles('stream-output/kafka-vehicle-data')
ssc.start()
ssc.awaitTermination(10)
错误:
16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200
16/01/29 15:05:45 ERROR JobScheduler: Error generating jobs for time 1454097945000 ms
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:232)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)
at com.sun.proxy.$Proxy14.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:230)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:114)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
16/01/29 15:05:45 INFO MemoryStore: Block input-0-1454097944800 stored as bytes in memory (estimated size 3.0 MB, free 466.1 MB)
16/01/29 15:05:45 INFO BlockManagerInfo: Added input-0-1454097944800 in memory on localhost:58960 (size: 3.0 MB, free: 45.1 MB)
似乎正确的调用方法是awaitTerminationOrTimeout(self, timeout)。
我不确定它是否也停止了流上下文。所以也许你可以在超时结束后立即调用 ssc.stop()。
ssc.start()
ssc.awaitTerminationOrTimeout(10)
ssc.stop()
注意:看看
试试 Kafka "consumer.timeout.ms" 参数,它会优雅地结束 KafkaReceiver。(来自 kafka 0.8 configuration)
Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
HDF = KafkaUtils.createStream(ssc, topics={strLoc : 1}, kafkaParams={"consumer.timeout.ms":"20000" }, zkQuorum='xxx:2181', groupId='xxx-consumer-group')
您将无法在当前流式执行中接收任何新的 kafka 消息,并且总是得到空的 RDD。
并检查 DSteam.foreachRDD(func) 中空 RDD 的数量。如果您连续获得空 RDD,则终止流式执行。
这里的问题从 spark 1.6 开始,在与 Dstream 处理线程相同的线程中调用 ssc.stop 会造成死锁,因为停止会等待轮询器线程完成创建 deadlock.sp 停止调用来自另一个线程