使用 RxScala 进行数据库轮询

Database Polling using RxScala

TableEntries我从 RxScala 开始,我试图想出一个轮询机制来检查数据库的每个时间间隔(比如 20 秒),以检查是否有一些行有任何变化table。

object MyDatabaseService {

  def getAllEntries: List[MyTableEntries] = ???
}

我需要从一个会发出 List[MyTableEntries] 的 Observable 开始。所以我从以下开始:

class MyDBObservable(service: MyDatabaseService, observer: Observer[Seq[MyTableEntries]]) extends Observable[Seq[MyTableEntries]] {

  val o = Observable.interval(10.seconds).map { _ => service.getAllTableEntries }
  o.subscribe(observer)
}

在我传递给函数的 Observer 中,我实现了 onNext、onError 和 onCompleted!但是有几个问题:

  1. 如果我的数据库响应时间超过 30 秒会怎样
  2. 如果我的数据库完全崩溃会怎样?

这是我所做的有效方法吗?建议?

What happens if my database takes more than 30 seconds to respond

假设第一个 service.getAllTableEntries 需要 30 秒,第二个和第三个 service.getAllTableEntries 需要 1 秒。

然后在您的示例中,第一个 service.getAllTableEntries 发生在 10 秒,第二个发生在 40 秒,第三个发生在 41 秒。

基本上不会因为长运行动作而跳过活动。相反,只是延迟事件。

What happens if my database is completely down?

如果是这样,我认为service.getAllTableEntries会抛出异常,你会在onError收到。