Scala 期货 - 如何在完成时结束?
Scala futures - how to end on completion?
我从一位前同事那里继承了一些代码,他开始使用 futures(在 Scala 中)在 Databricks 中处理一些数据。
我把它分成在相似时间段内完成的块。但是没有输出,我知道他们没有使用 onSuccess 或 Await 或任何东西。
问题是,代码完成 运行(不 return 输出)但是 Databricks 中的块一直执行到 thread.sleep() 部分。
我是 Scala 和 futures 的新手,我不确定如何在所有 futures 完成后退出 notebook 运行(我应该在 future 块之后使用 dbutils.notebook.exit() ?)
代码如下:
import scala.concurrent.{Future, blocking, Await}
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import com.databricks.WorkflowException
val numNotebooksInParallel = 15
// If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
// This code limits the number of parallel notebooks.
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
val ctx = dbutils.notebook.getContext()
// The simplest interface we can have but doesn't
// have protection for submitting to many notebooks in parallel at once
println("starting parallel jobs... hang tight")
Future {
process("pro","bseg")
process("prc","bkpf")
process("prc","bseg")
process("pr4","bkpf")
process("pr4","bseg")
println("done with future1")
}
Future {
process("pr5","bkpf")
process("pr5","bseg")
process("pri","bkpf")
process("pri","bseg")
process("pr9","bkpf")
println("done with future2")
}
Future {
process("pr9","bseg")
process("prl","bkpf")
process("prl","bseg")
process("pro","bkpf")
println("done with future3")
}
println("finished futures - yay! :)")
Thread.sleep(5*60*60*1000)
println("thread timed out after 5 hrs... hope it all finished.")
通常会将期货保存为值:
val futs = Seq(
Future {
process("pro","bseg")
// and so on
},
// then the other futures
)
然后对期货进行操作:
import scala.concurrent.Await
import scala.concurrent.duration._
Await.result(Future.sequence(futs), 5.hours)
Future.sequence
将在第一个失败或全部成功后停止。如果你想让它们全部 运行 即使一个失败,你可以做类似
Await.result(
futs.foldLeft(Future.unit) { (_, f) =>
f.recover {
case _ => ()
}
},
5.hours
)
我从一位前同事那里继承了一些代码,他开始使用 futures(在 Scala 中)在 Databricks 中处理一些数据。
我把它分成在相似时间段内完成的块。但是没有输出,我知道他们没有使用 onSuccess 或 Await 或任何东西。
问题是,代码完成 运行(不 return 输出)但是 Databricks 中的块一直执行到 thread.sleep() 部分。
我是 Scala 和 futures 的新手,我不确定如何在所有 futures 完成后退出 notebook 运行(我应该在 future 块之后使用 dbutils.notebook.exit() ?)
代码如下:
import scala.concurrent.{Future, blocking, Await}
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import com.databricks.WorkflowException
val numNotebooksInParallel = 15
// If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
// This code limits the number of parallel notebooks.
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
val ctx = dbutils.notebook.getContext()
// The simplest interface we can have but doesn't
// have protection for submitting to many notebooks in parallel at once
println("starting parallel jobs... hang tight")
Future {
process("pro","bseg")
process("prc","bkpf")
process("prc","bseg")
process("pr4","bkpf")
process("pr4","bseg")
println("done with future1")
}
Future {
process("pr5","bkpf")
process("pr5","bseg")
process("pri","bkpf")
process("pri","bseg")
process("pr9","bkpf")
println("done with future2")
}
Future {
process("pr9","bseg")
process("prl","bkpf")
process("prl","bseg")
process("pro","bkpf")
println("done with future3")
}
println("finished futures - yay! :)")
Thread.sleep(5*60*60*1000)
println("thread timed out after 5 hrs... hope it all finished.")
通常会将期货保存为值:
val futs = Seq(
Future {
process("pro","bseg")
// and so on
},
// then the other futures
)
然后对期货进行操作:
import scala.concurrent.Await
import scala.concurrent.duration._
Await.result(Future.sequence(futs), 5.hours)
Future.sequence
将在第一个失败或全部成功后停止。如果你想让它们全部 运行 即使一个失败,你可以做类似
Await.result(
futs.foldLeft(Future.unit) { (_, f) =>
f.recover {
case _ => ()
}
},
5.hours
)