如何在 Spark 流中创建停止条件?
How to create a stop condition on Spark streaming?
我想使用 spark streaming 从 HDFS 读取数据。这个想法是另一个程序将继续将新文件上传到 HDFS 目录,我的 spark 流作业将处理该目录。但是,我也想有一个结束条件。也就是说,将文件上传到 HDFS 的程序可以向 spark streaming 程序发出信号,表明它已完成所有文件的上传。
举个简单的例子,取Here的程序。代码如下所示。假设另一个程序正在上传这些文件,那么该程序如何以编程方式向火花流程序发出结束条件信号(不需要我们按 CTRL+C)?
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage StreamingWordCount <input-directory> <output-directory>")
System.exit(0)
}
val inputDir=args(0)
val output=args(1)
val conf = new SparkConf().setAppName("Spark Streaming Example")
val streamingContext = new StreamingContext(conf, Seconds(10))
val lines = streamingContext.textFileStream(inputDir)
val words = lines.flatMap(_.split(" "))
val wc = words.map(x => (x, 1))
wc.foreachRDD(rdd => {
val counts = rdd.reduceByKey((x, y) => x + y)
counts.saveAsTextFile(output)
val collectedCounts = counts.collect
collectedCounts.foreach(c => println(c))
}
)
println("StreamingWordCount: streamingContext start")
streamingContext.start()
println("StreamingWordCount: await termination")
streamingContext.awaitTermination()
println("StreamingWordCount: done!")
}
}
好的,我明白了。基本上,您从调用 ssc.stop()
的地方创建另一个线程,以指示流处理停止。比如像这样。
val ssc = new StreamingContext(sparkConf, Seconds(1))
//////////////////////////////////////////////////////////////////////
val thread = new Thread
{
override def run
{
....
// On reaching the end condition
ssc.stop()
}
}
thread.start
//////////////////////////////////////////////////////////////////////
val lines = ssc.textFileStream("inputDir")
.....
我想使用 spark streaming 从 HDFS 读取数据。这个想法是另一个程序将继续将新文件上传到 HDFS 目录,我的 spark 流作业将处理该目录。但是,我也想有一个结束条件。也就是说,将文件上传到 HDFS 的程序可以向 spark streaming 程序发出信号,表明它已完成所有文件的上传。
举个简单的例子,取Here的程序。代码如下所示。假设另一个程序正在上传这些文件,那么该程序如何以编程方式向火花流程序发出结束条件信号(不需要我们按 CTRL+C)?
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage StreamingWordCount <input-directory> <output-directory>")
System.exit(0)
}
val inputDir=args(0)
val output=args(1)
val conf = new SparkConf().setAppName("Spark Streaming Example")
val streamingContext = new StreamingContext(conf, Seconds(10))
val lines = streamingContext.textFileStream(inputDir)
val words = lines.flatMap(_.split(" "))
val wc = words.map(x => (x, 1))
wc.foreachRDD(rdd => {
val counts = rdd.reduceByKey((x, y) => x + y)
counts.saveAsTextFile(output)
val collectedCounts = counts.collect
collectedCounts.foreach(c => println(c))
}
)
println("StreamingWordCount: streamingContext start")
streamingContext.start()
println("StreamingWordCount: await termination")
streamingContext.awaitTermination()
println("StreamingWordCount: done!")
}
}
好的,我明白了。基本上,您从调用 ssc.stop()
的地方创建另一个线程,以指示流处理停止。比如像这样。
val ssc = new StreamingContext(sparkConf, Seconds(1))
//////////////////////////////////////////////////////////////////////
val thread = new Thread
{
override def run
{
....
// On reaching the end condition
ssc.stop()
}
}
thread.start
//////////////////////////////////////////////////////////////////////
val lines = ssc.textFileStream("inputDir")
.....