Scala 期货 - 如何在完成时结束?

Scala futures - how to end on completion?

我从一位前同事那里继承了一些代码,他开始使用 futures(在 Scala 中)在 Databricks 中处理一些数据。

我把它分成在相似时间段内完成的块。但是没有输出,我知道他们没有使用 onSuccess 或 Await 或任何东西。

问题是,代码完成 运行(不 return 输出)但是 Databricks 中的块一直执行到 thread.sleep() 部分。

我是 Scala 和 futures 的新手,我不确定如何在所有 futures 完成后退出 notebook 运行(我应该在 future 块之后使用 dbutils.notebook.exit() ?)

代码如下:

import scala.concurrent.{Future, blocking, Await}
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import com.databricks.WorkflowException

val numNotebooksInParallel = 15
  // If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once. 
  // This code limits the number of parallel notebooks.

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
  
  val ctx = dbutils.notebook.getContext()
  // The simplest interface we can have but doesn't
  // have protection for submitting to many notebooks in parallel at once

println("starting parallel jobs... hang tight")

Future {
  
        process("pro","bseg")

        process("prc","bkpf")

        process("prc","bseg")
        
        process("pr4","bkpf")

        process("pr4","bseg")

        println("done with future1")

      }
Future {
        
        process("pr5","bkpf")
        
        process("pr5","bseg")
        
        process("pri","bkpf") 
        
        process("pri","bseg")
        
        process("pr9","bkpf")

        println("done with future2")
  
      }
Future { 
        
        process("pr9","bseg")
        
        process("prl","bkpf") 
        
        process("prl","bseg")
        
        process("pro","bkpf")

        println("done with future3")

      }
  println("finished futures - yay! :)")

  Thread.sleep(5*60*60*1000)
  println("thread timed out after 5 hrs... hope it all finished.")

通常会将期货保存为值:

val futs = Seq(
  Future {
    process("pro","bseg")
    // and so on
  },
  // then the other futures
)

然后对期货进行操作:

import scala.concurrent.Await
import scala.concurrent.duration._

Await.result(Future.sequence(futs), 5.hours)

Future.sequence 将在第一个失败或全部成功后停止。如果你想让它们全部 运行 即使一个失败,你可以做类似

Await.result(
  futs.foldLeft(Future.unit) { (_, f) =>
    f.recover {
      case _ => ()
    }
  },
  5.hours
)