在 Scala 中惯用地调度与主线程一起死亡的后台工作
Idiomatically scheduling background work that dies with the main thread in Scala
我有一个 scala 程序 运行 运行一段时间然后终止。我想为这个程序提供一个库,在幕后,每 N
秒安排一个异步任务到 运行。我还希望程序在 main
入口点的工作完成时终止,而无需明确告诉后台工作关闭(因为它在库中)。
据我所知,在 Scala 中进行轮询或计划工作的惯用方法是使用 Akka 的 ActorSystem.scheduler.schedule
,但是使用 ActorSystem 会使程序在 main
等待演员后挂起。然后我尝试在主线程上添加另一个 join
s 的演员但失败了,似乎是因为 "Anything that blocks a thread is not advised within Akka"
我可以介绍一个自定义调度程序;我可以将某些东西与轮询 isAlive
检查结合在一起,或者在每个工作人员中添加类似的检查;或者我可以放弃 Akka 并只使用原始线程。
这似乎是一件不太寻常的事情,所以如果有明确的最佳方法,我想使用惯用的 Scala。
我认为没有惯用的 Scala 方式。
JVM 程序在所有非守护线程完成后终止。因此,您可以将任务安排到守护进程线程上的 运行。
所以只需使用 Java 功能:
import java.util.concurrent._
object Main {
def main(args: Array[String]): Unit = {
// Make a ThreadFactory that creates daemon threads.
val threadFactory = new ThreadFactory() {
def newThread(r: Runnable) = {
val t = Executors.defaultThreadFactory().newThread(r)
t.setDaemon(true)
t
}
}
// Create a scheduled pool using this thread factory
val pool = Executors.newSingleThreadScheduledExecutor(threadFactory)
// Schedule some function to run every second after an initial delay of 0 seconds
// This assumes Scala 2.12. In 2.11 you'd have to create a `new Runnable` manually
// Note that scheduling will stop, if there is an exception thrown from the function
pool.scheduleAtFixedRate(() => println("run"), 0, 1, TimeUnit.SECONDS)
Thread.sleep(5000)
}
}
您还可以使用 guava 创建一个带有 new ThreadFactoryBuilder().setDaemon(true).build()
的守护进程线程工厂。
如果您使用 Akka 调度程序,您将依赖经过充分测试的高度调整和优化的实现。不过,我同意,提出演员系统有点沉重。此外,您必须引入对 akka 的依赖。如果你同意,你可以在完成后从 main 显式调用 system.shutdown
,或者将它包装在一个函数中,它将为你完成。
或者,您可以尝试以下方法:
import scala.concurrent._
import ExecutionContext.Implicits.global
object Main extends App {
def repeatEvery[T](timeoutMillis: Int)(f: => T): Future[T] = {
val p = Promise[T]()
val never = p.future
f
def timeout = Future {
Thread.sleep(timeoutMillis)
throw new TimeoutException
}
val failure = Future.firstCompletedOf(List(never, timeout))
failure.recoverWith { case _ => repeatEvery(timeoutMillis)(f) }
}
repeatEvery(1000) {
println("scheduled job called")
}
println("main started doing its work")
Thread.sleep(10000)
println("main finished")
}
打印:
scheduled job called
main started doing its work
scheduled job called
scheduled job called
scheduled job called
scheduled job called
scheduled job called
scheduled job called
scheduled job called
scheduled job called
scheduled job called
main finished
我不喜欢它使用 Thread.sleep
,但这样做是为了避免使用任何其他第 3 方调度程序,并且 Scala Future 不提供超时选项。所以你会在那个调度任务上浪费一个线程,但这就是 Akka 调度程序看起来 to do anyway。不同之处在于,您可能希望整个 JVM 的单个调度程序不会浪费太多线程。我提供的代码虽然更简单,但每个作业都会浪费一个线程。
我有一个 scala 程序 运行 运行一段时间然后终止。我想为这个程序提供一个库,在幕后,每 N
秒安排一个异步任务到 运行。我还希望程序在 main
入口点的工作完成时终止,而无需明确告诉后台工作关闭(因为它在库中)。
据我所知,在 Scala 中进行轮询或计划工作的惯用方法是使用 Akka 的 ActorSystem.scheduler.schedule
,但是使用 ActorSystem 会使程序在 main
等待演员后挂起。然后我尝试在主线程上添加另一个 join
s 的演员但失败了,似乎是因为 "Anything that blocks a thread is not advised within Akka"
我可以介绍一个自定义调度程序;我可以将某些东西与轮询 isAlive
检查结合在一起,或者在每个工作人员中添加类似的检查;或者我可以放弃 Akka 并只使用原始线程。
这似乎是一件不太寻常的事情,所以如果有明确的最佳方法,我想使用惯用的 Scala。
我认为没有惯用的 Scala 方式。
JVM 程序在所有非守护线程完成后终止。因此,您可以将任务安排到守护进程线程上的 运行。
所以只需使用 Java 功能:
import java.util.concurrent._
object Main {
def main(args: Array[String]): Unit = {
// Make a ThreadFactory that creates daemon threads.
val threadFactory = new ThreadFactory() {
def newThread(r: Runnable) = {
val t = Executors.defaultThreadFactory().newThread(r)
t.setDaemon(true)
t
}
}
// Create a scheduled pool using this thread factory
val pool = Executors.newSingleThreadScheduledExecutor(threadFactory)
// Schedule some function to run every second after an initial delay of 0 seconds
// This assumes Scala 2.12. In 2.11 you'd have to create a `new Runnable` manually
// Note that scheduling will stop, if there is an exception thrown from the function
pool.scheduleAtFixedRate(() => println("run"), 0, 1, TimeUnit.SECONDS)
Thread.sleep(5000)
}
}
您还可以使用 guava 创建一个带有 new ThreadFactoryBuilder().setDaemon(true).build()
的守护进程线程工厂。
如果您使用 Akka 调度程序,您将依赖经过充分测试的高度调整和优化的实现。不过,我同意,提出演员系统有点沉重。此外,您必须引入对 akka 的依赖。如果你同意,你可以在完成后从 main 显式调用 system.shutdown
,或者将它包装在一个函数中,它将为你完成。
或者,您可以尝试以下方法:
import scala.concurrent._
import ExecutionContext.Implicits.global
object Main extends App {
def repeatEvery[T](timeoutMillis: Int)(f: => T): Future[T] = {
val p = Promise[T]()
val never = p.future
f
def timeout = Future {
Thread.sleep(timeoutMillis)
throw new TimeoutException
}
val failure = Future.firstCompletedOf(List(never, timeout))
failure.recoverWith { case _ => repeatEvery(timeoutMillis)(f) }
}
repeatEvery(1000) {
println("scheduled job called")
}
println("main started doing its work")
Thread.sleep(10000)
println("main finished")
}
打印:
scheduled job called
main started doing its work
scheduled job called
scheduled job called
scheduled job called
scheduled job called
scheduled job called
scheduled job called
scheduled job called
scheduled job called
scheduled job called
main finished
我不喜欢它使用 Thread.sleep
,但这样做是为了避免使用任何其他第 3 方调度程序,并且 Scala Future 不提供超时选项。所以你会在那个调度任务上浪费一个线程,但这就是 Akka 调度程序看起来 to do anyway。不同之处在于,您可能希望整个 JVM 的单个调度程序不会浪费太多线程。我提供的代码虽然更简单,但每个作业都会浪费一个线程。