如何从持久的石英作业中安排scala Future
How to schedule scala Future from persist quartz job
假设,有一些 scala 代码应该在 java quartz library 的帮助下进行调度。
我们需要将此代码执行的结果存储在作业上下文中,以便在下一次作业执行时访问此结果。
对于合成示例,有一些 CounterService
具有应安排的 inc
函数:
trait CounterService {
def inc(): Int
}
以下 quartz-job 调用 inc
并将其结果成功存储在 JobDataMap
中:
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
val counterService: CounterService = ...
override def execute(context: JobExecutionContext): Unit = {
val newCounterValue: Int = counterService.inc()
val map = context.getJobDetail.getJobDataMap
map.put("counter", newCounterValue)
}
}
我们可以在其他地方随时获得工作结果(如果我们参考scheduler
):
val scheduler: Scheduler = ...
// gets details of our CounterJob which was created and registered in the scheduler
// by the name "counter-job" (it is not shown in our example)
val job = scheduler.getJobDetail(JobKey.jobKey("counter-job"))
// this map will contain the job result which was stored by the key "counter"
val map = job.getJobDataMap.asScala
但是如果我们想从 quartz-job 中执行 async 代码,这个方法就不起作用了。
例如,假设我们的柜台服务是这样的:
trait AsyncCounterService {
def asyncInc(): Future[Int]
}
我们可以尝试通过以下方式来实现我们的工作。但它不能正常工作,因为
方法 CounterJob.execute
可以比 asyncCounterService.asyncInc
更早执行。
而且我们不能将 asyncInc
的结果存储在 JobDataMap
:
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
val counterService: AsyncCounterService = ...
val execContext: ExecutionContext = ...
override def execute(context: JobExecutionContext): Unit = {
// # 1: we can not influence on the execution flow of this future
// from job scheduler.
val counterFuture: Future[Int] = counterService.asyncInc()
counterFuture.map { counterValue: Int =>
val map = context.getJobDetail.getJobDataMap
// #2: this action won't have any effect
map.put("counter", counterValue)
}
}
}
至少是这个解决方案的两个问题,在上面的代码中标记为#1 ...
和#2 ...
评论。
有没有更好的做法来解决这个问题?
换句话说,如何通过存储 Future's
结果来安排 scala Future
从 persist quartz 作业到 JobDetailData
映射?
如果 CounterJob 之后的所有内容都需要具有 counterService 值,那么可以在 CounterJob 中阻塞并等待 Future。反正那时候什么也不能执行,因为还没有计算出值。
import scala.concurrent.{Await,Future}
...
try {
val counterValue = Await.result(counterFuture, 5.seconds)
map.put("counter", counterValue)
} catch {
case t: TimeoutException => ...
case t: Exception => ...
}
如果您在该作业中有多个异步期货,您可以将它们与 flatMap, map
操作的单子链和 for comprehension
或来自 Future
伴随对象的静态辅助方法组合例如 Future.sequence
然后 endresult 将是一个结合所有异步操作的未来,你可以等待 Await
.
通常情况下等待期货被认为是一种不好的做法。因为这会阻止执行程序线程在等待未来完成时执行任何其他操作。
但是在这里,您将另一个作业调度框架与另一个并发范例混合在一起。如上所述,在特定示例中,阻塞是可以的,因为后面的一切都依赖于第一次计算。
如果其他工作可以同时运行,将有多种解决方法:
- 有一种方法可以从工作中 return 未来。
然后你可以等待这个未来完成,然后再安排依赖
职位。
- 作业中有某种自定义事件侦听器机制,可以从作业中触发。
counterFuture.map {context.notify("computationReady")}
- 有特定的 AsyncJob 支持非阻塞 io,它期望 java Future 为 return。然后你可以将 Scala Future 转换为 Java Future
假设,有一些 scala 代码应该在 java quartz library 的帮助下进行调度。
我们需要将此代码执行的结果存储在作业上下文中,以便在下一次作业执行时访问此结果。
对于合成示例,有一些 CounterService
具有应安排的 inc
函数:
trait CounterService {
def inc(): Int
}
以下 quartz-job 调用 inc
并将其结果成功存储在 JobDataMap
中:
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
val counterService: CounterService = ...
override def execute(context: JobExecutionContext): Unit = {
val newCounterValue: Int = counterService.inc()
val map = context.getJobDetail.getJobDataMap
map.put("counter", newCounterValue)
}
}
我们可以在其他地方随时获得工作结果(如果我们参考scheduler
):
val scheduler: Scheduler = ...
// gets details of our CounterJob which was created and registered in the scheduler
// by the name "counter-job" (it is not shown in our example)
val job = scheduler.getJobDetail(JobKey.jobKey("counter-job"))
// this map will contain the job result which was stored by the key "counter"
val map = job.getJobDataMap.asScala
但是如果我们想从 quartz-job 中执行 async 代码,这个方法就不起作用了。 例如,假设我们的柜台服务是这样的:
trait AsyncCounterService {
def asyncInc(): Future[Int]
}
我们可以尝试通过以下方式来实现我们的工作。但它不能正常工作,因为
方法 CounterJob.execute
可以比 asyncCounterService.asyncInc
更早执行。
而且我们不能将 asyncInc
的结果存储在 JobDataMap
:
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
val counterService: AsyncCounterService = ...
val execContext: ExecutionContext = ...
override def execute(context: JobExecutionContext): Unit = {
// # 1: we can not influence on the execution flow of this future
// from job scheduler.
val counterFuture: Future[Int] = counterService.asyncInc()
counterFuture.map { counterValue: Int =>
val map = context.getJobDetail.getJobDataMap
// #2: this action won't have any effect
map.put("counter", counterValue)
}
}
}
至少是这个解决方案的两个问题,在上面的代码中标记为#1 ...
和#2 ...
评论。
有没有更好的做法来解决这个问题?
换句话说,如何通过存储 Future's
结果来安排 scala Future
从 persist quartz 作业到 JobDetailData
映射?
如果 CounterJob 之后的所有内容都需要具有 counterService 值,那么可以在 CounterJob 中阻塞并等待 Future。反正那时候什么也不能执行,因为还没有计算出值。
import scala.concurrent.{Await,Future}
...
try {
val counterValue = Await.result(counterFuture, 5.seconds)
map.put("counter", counterValue)
} catch {
case t: TimeoutException => ...
case t: Exception => ...
}
如果您在该作业中有多个异步期货,您可以将它们与 flatMap, map
操作的单子链和 for comprehension
或来自 Future
伴随对象的静态辅助方法组合例如 Future.sequence
然后 endresult 将是一个结合所有异步操作的未来,你可以等待 Await
.
通常情况下等待期货被认为是一种不好的做法。因为这会阻止执行程序线程在等待未来完成时执行任何其他操作。
但是在这里,您将另一个作业调度框架与另一个并发范例混合在一起。如上所述,在特定示例中,阻塞是可以的,因为后面的一切都依赖于第一次计算。
如果其他工作可以同时运行,将有多种解决方法:
- 有一种方法可以从工作中 return 未来。 然后你可以等待这个未来完成,然后再安排依赖 职位。
- 作业中有某种自定义事件侦听器机制,可以从作业中触发。
counterFuture.map {context.notify("computationReady")}
- 有特定的 AsyncJob 支持非阻塞 io,它期望 java Future 为 return。然后你可以将 Scala Future 转换为 Java Future