在 Spark Streaming 中,有没有办法检测批处理何时完成?
In Spark Streaming, is there a way to detect when a batch has finished?
我将 Spark 1.6.0 与 Cloudera 5.8.3 一起使用。
我有一个 DStream
对象并在其上定义了大量转换,
val stream = KafkaUtils.createDirectStream[...](...)
val mappedStream = stream.transform { ... }.map { ... }
mappedStream.foreachRDD { ... }
mappedStream.foreachRDD { ... }
mappedStream.map { ... }.foreachRDD { ... }
有没有办法注册最后一个 foreachRDD
保证最后执行且仅当上述 foreachRDD
完成执行时?
换句话说,当 Spark UI 显示作业已完成时 - 那就是我想要执行轻量级函数的时候。
API 中有什么东西可以让我实现这一目标吗?
谢谢
使用流式监听器应该可以解决您的问题:
(抱歉,这是一个 java 示例)
ssc.addStreamingListener(new JobListener());
// ...
class JobListener implements StreamingListener {
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo().totalDelay().get().toString() + " ms");
}
/*
snipped other methods
*/
}
启动名称为 myStreamName
的流并等待它启动 -
deltaStreamingQuery = (streamingDF
.writeStream
.format("delta")
.queryName(myStreamName)
.start(writePath)
)
untilStreamIsReady(myStreamName)
PySpark 版本等待流启动:
def getActiveStreams():
try:
return spark.streams.active
except:
print("Unable to iterate over all active streams - using an empty set instead.")
return []
def untilStreamIsReady(name, progressions=3):
import time
queries = list(filter(lambda query: query.name == name, getActiveStreams()))
while (len(queries) == 0 or len(queries[0].recentProgress) < progressions):
time.sleep(5) # Give it a couple of seconds
queries = list(filter(lambda query: query.name == name, getActiveStreams()))
print("The stream {} is active and ready.".format(name))
Spark Scala 版本等待流启动:
def getActiveStreams():Seq[org.apache.spark.sql.streaming.StreamingQuery] = {
return try {
spark.streams.active
} catch {
case e:Throwable => {
// In extream cases, this funtion may throw an ignorable error.
println("Unable to iterate over all active streams - using an empty set instead.")
Seq[org.apache.spark.sql.streaming.StreamingQuery]()
}
}
}
def untilStreamIsReady(name:String, progressions:Int = 3):Unit = {
var queries = getActiveStreams().filter(_.name == name)
while (queries.length == 0 || queries(0).recentProgress.length < progressions) {
Thread.sleep(5*1000) // Give it a couple of seconds
queries = getActiveStreams().filter(_.name == name)
}
println("The stream %s is active and ready.".format(name))
}
对于最初的问题.. 添加此函数的另一个版本 - 首先等待流启动,然后再等待一次(只需在等待状态上添加一个否定条件)让它完成,这样就完成了版本看起来像这样 -
untilStreamIsReady(myStreamName)
untilStreamIsDone(myStreamName) // reverse of untilStreamIsReady - wait when myStreamName will not be in the list
我将 Spark 1.6.0 与 Cloudera 5.8.3 一起使用。
我有一个 DStream
对象并在其上定义了大量转换,
val stream = KafkaUtils.createDirectStream[...](...)
val mappedStream = stream.transform { ... }.map { ... }
mappedStream.foreachRDD { ... }
mappedStream.foreachRDD { ... }
mappedStream.map { ... }.foreachRDD { ... }
有没有办法注册最后一个 foreachRDD
保证最后执行且仅当上述 foreachRDD
完成执行时?
换句话说,当 Spark UI 显示作业已完成时 - 那就是我想要执行轻量级函数的时候。
API 中有什么东西可以让我实现这一目标吗?
谢谢
使用流式监听器应该可以解决您的问题:
(抱歉,这是一个 java 示例)
ssc.addStreamingListener(new JobListener());
// ...
class JobListener implements StreamingListener {
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo().totalDelay().get().toString() + " ms");
}
/*
snipped other methods
*/
}
启动名称为 myStreamName
的流并等待它启动 -
deltaStreamingQuery = (streamingDF
.writeStream
.format("delta")
.queryName(myStreamName)
.start(writePath)
)
untilStreamIsReady(myStreamName)
PySpark 版本等待流启动:
def getActiveStreams():
try:
return spark.streams.active
except:
print("Unable to iterate over all active streams - using an empty set instead.")
return []
def untilStreamIsReady(name, progressions=3):
import time
queries = list(filter(lambda query: query.name == name, getActiveStreams()))
while (len(queries) == 0 or len(queries[0].recentProgress) < progressions):
time.sleep(5) # Give it a couple of seconds
queries = list(filter(lambda query: query.name == name, getActiveStreams()))
print("The stream {} is active and ready.".format(name))
Spark Scala 版本等待流启动:
def getActiveStreams():Seq[org.apache.spark.sql.streaming.StreamingQuery] = {
return try {
spark.streams.active
} catch {
case e:Throwable => {
// In extream cases, this funtion may throw an ignorable error.
println("Unable to iterate over all active streams - using an empty set instead.")
Seq[org.apache.spark.sql.streaming.StreamingQuery]()
}
}
}
def untilStreamIsReady(name:String, progressions:Int = 3):Unit = {
var queries = getActiveStreams().filter(_.name == name)
while (queries.length == 0 || queries(0).recentProgress.length < progressions) {
Thread.sleep(5*1000) // Give it a couple of seconds
queries = getActiveStreams().filter(_.name == name)
}
println("The stream %s is active and ready.".format(name))
}
对于最初的问题.. 添加此函数的另一个版本 - 首先等待流启动,然后再等待一次(只需在等待状态上添加一个否定条件)让它完成,这样就完成了版本看起来像这样 -
untilStreamIsReady(myStreamName)
untilStreamIsDone(myStreamName) // reverse of untilStreamIsReady - wait when myStreamName will not be in the list