如何优雅地停止笔记本流式传输作业?
How to stop a notebook streaming job gracefully?
我有一个流应用程序 运行 到 Databricks notebook 作业 (https://docs.databricks.com/jobs.html)。我希望能够使用 stream.start()
方法返回的 StreamingQuery
class 的 stop()
方法优雅地停止流式处理作业。这当然需要访问提到的流实例或访问 运行 作业本身的上下文。在第二种情况下,代码如下所示:
spark.sqlContext.streams.get("some_streaming_uuid").stop()
上面的代码应该从不同的笔记本作业中执行,让我们称之为 stop_streaming_job
尽管我无法找到访问作业上下文和执行上面的 Scala 代码的方法。有什么方法可以用数据块笔记本实现吗?
解决此问题的一种方法是使用数据块文件系统 (dbfs) 或您的本地文件系统。这个想法是通过实现一个名为 awaitExternalTermination
的新函数来扩展 Spark StreamingQuery
class 的功能。该解决方案在给定的 DBFS 目录中创建一个新文件,该文件充当负责流作业生命周期的标志。只要文件存在于给定目录中,作业就会继续 运行。接下来是文件观察器的实现,它是 StreamingQuery
class 的扩展方法并使用 Scala 期货:
object extensions {
import fs._
object FileSystemType extends Enumeration {
val DBFS, LocalFileSystem = Value
}
implicit class FileSystemStopStreamingQuery(val self :StreamingQuery) extends AnyVal {
/**
* Extension method for StreamingQuery, it waits for an external call to delete the streaming file. When that happens it will call the stop method
* of the current StreamingQuery instance.
*
* @param streamStopDir dir to be watched
* @param jobName the job unique identifier/the file name
* @param fsType DFFS or LocalFileSystem
*/
def awaitExternalTermination(streamStopDir :String, jobName :String, fsType : FileSystemType.Value): Unit ={
if(streamStopDir == null || streamStopDir.isEmpty)
throw new IllegalArgumentException("streamStopDir can't be null or empty.")
if(jobName == null || jobName.isEmpty)
throw new IllegalArgumentException("jobName can't be null or empty.")
val fsWrapper :FileSystemWrapper = fsType match {
case FileSystemType.DBFS => new DbfsWrapper(streamStopDir, jobName)
case FileSystemType.LocalFileSystem => new LocalFileSystemWrapper(streamStopDir, jobName)
case _ => throw new IllegalArgumentException("Invalid file system provided.")
}
val stopWatchFuture: Future[Boolean] = Future {
if(!fsWrapper.targetFileExists)
fsWrapper.createTargetFile(self.id.toString)
while (self.isActive && fsWrapper.targetFileExists){
val random: ThreadLocalRandom = ThreadLocalRandom.current()
val r = random.nextLong(10, 100 + 1) // returns value between 10 and 100
Thread.sleep(r)
}
if(!fsWrapper.targetFileExists){
self.stop()
true
}
else
false
}
var output = "success"
stopWatchFuture onComplete {
case Success(result : Boolean) => if (!result) {
output = s"failure: file not found."
}
case Failure(t) => output = s"failure: ${t.getMessage}."
}
self.awaitTermination()
}
}
}
以及DBFS包装器的实现class:
import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
class DbfsWrapper(val stopDir: String, val targetFile: String) extends FileSystemWrapper {
override def targetFileExists(): Boolean = {
try {
dbutils.fs.ls(targetPath).size > 0
}
catch {
case _: java.io.FileNotFoundException => false
}
}
override def createTargetFile(content: String): Unit = {
dbutils.fs.put(targetPath, content)
}
}
要停止流式传输作业,只需在使用 DBFS 时使用 %fs rm -r your_path
删除提到的文件,或者对于本地 FS 只需 rm -r your_path
。
完整代码可见here.
我有一个流应用程序 运行 到 Databricks notebook 作业 (https://docs.databricks.com/jobs.html)。我希望能够使用 stream.start()
方法返回的 StreamingQuery
class 的 stop()
方法优雅地停止流式处理作业。这当然需要访问提到的流实例或访问 运行 作业本身的上下文。在第二种情况下,代码如下所示:
spark.sqlContext.streams.get("some_streaming_uuid").stop()
上面的代码应该从不同的笔记本作业中执行,让我们称之为 stop_streaming_job
尽管我无法找到访问作业上下文和执行上面的 Scala 代码的方法。有什么方法可以用数据块笔记本实现吗?
解决此问题的一种方法是使用数据块文件系统 (dbfs) 或您的本地文件系统。这个想法是通过实现一个名为 awaitExternalTermination
的新函数来扩展 Spark StreamingQuery
class 的功能。该解决方案在给定的 DBFS 目录中创建一个新文件,该文件充当负责流作业生命周期的标志。只要文件存在于给定目录中,作业就会继续 运行。接下来是文件观察器的实现,它是 StreamingQuery
class 的扩展方法并使用 Scala 期货:
object extensions {
import fs._
object FileSystemType extends Enumeration {
val DBFS, LocalFileSystem = Value
}
implicit class FileSystemStopStreamingQuery(val self :StreamingQuery) extends AnyVal {
/**
* Extension method for StreamingQuery, it waits for an external call to delete the streaming file. When that happens it will call the stop method
* of the current StreamingQuery instance.
*
* @param streamStopDir dir to be watched
* @param jobName the job unique identifier/the file name
* @param fsType DFFS or LocalFileSystem
*/
def awaitExternalTermination(streamStopDir :String, jobName :String, fsType : FileSystemType.Value): Unit ={
if(streamStopDir == null || streamStopDir.isEmpty)
throw new IllegalArgumentException("streamStopDir can't be null or empty.")
if(jobName == null || jobName.isEmpty)
throw new IllegalArgumentException("jobName can't be null or empty.")
val fsWrapper :FileSystemWrapper = fsType match {
case FileSystemType.DBFS => new DbfsWrapper(streamStopDir, jobName)
case FileSystemType.LocalFileSystem => new LocalFileSystemWrapper(streamStopDir, jobName)
case _ => throw new IllegalArgumentException("Invalid file system provided.")
}
val stopWatchFuture: Future[Boolean] = Future {
if(!fsWrapper.targetFileExists)
fsWrapper.createTargetFile(self.id.toString)
while (self.isActive && fsWrapper.targetFileExists){
val random: ThreadLocalRandom = ThreadLocalRandom.current()
val r = random.nextLong(10, 100 + 1) // returns value between 10 and 100
Thread.sleep(r)
}
if(!fsWrapper.targetFileExists){
self.stop()
true
}
else
false
}
var output = "success"
stopWatchFuture onComplete {
case Success(result : Boolean) => if (!result) {
output = s"failure: file not found."
}
case Failure(t) => output = s"failure: ${t.getMessage}."
}
self.awaitTermination()
}
}
}
以及DBFS包装器的实现class:
import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
class DbfsWrapper(val stopDir: String, val targetFile: String) extends FileSystemWrapper {
override def targetFileExists(): Boolean = {
try {
dbutils.fs.ls(targetPath).size > 0
}
catch {
case _: java.io.FileNotFoundException => false
}
}
override def createTargetFile(content: String): Unit = {
dbutils.fs.put(targetPath, content)
}
}
要停止流式传输作业,只需在使用 DBFS 时使用 %fs rm -r your_path
删除提到的文件,或者对于本地 FS 只需 rm -r your_path
。
完整代码可见here.