如何在 RxScala 中使用自定义调度器?

How to use custom Scheduler in RxScala?

我尝试

val executors = Executors.newSingleThreadExecutor()
val scheduler = Schedulers.from(executors)
Observable.just[Int](1,2,3).subscribeOn(scheduler)

输出一个错误

Error:(103, 43) type mismatch;
found   : rx.Scheduler
required: rx.lang.scala.Scheduler
Observable.just[Int](1,2,3).subscribeOn(scheduler)
                                      ^ 

如何在RxScala中使用自定义调度程序?

这里的问题是你在这里混合了 RxJava 和 RxScala 代码。你看,RxScala 只是 RxJava 功能的包装器;前者只转发给后者,没有任何 'real' 实现。这很有用,因为您只需要维护 1 个版本而不是 2 个或更多版本。

你的例子中 scheduler 的类型是 rx.Scheduler,所以它是一个 RxJava Scheduler。但是,subscribeOn 要求您提供一个 rx.lang.scala.Scheduler,这是一个 RxScala Scheduler。因此,您需要将 RxJava Scheduler 转换为 RxScala.

但是,对于您的情况,有更好的方法:使用 fromExecutor 工厂方法将 Executors.newSingleThreadExecutor 包装到 scala.concurrent.ExecutionContext 中。然后将其包装到 rx.lang.scala.schedulers.ExecutionContextScheduler 中,您就拥有了可以在 subscribeOn 中使用的调度程序。你的代码看起来像这样(我包含了一个打印语句来查看这些东西在哪个线程上运行):

val e = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val s = ExecutionContextScheduler(e)

Observable.just(1, 2, 3)
    .subscribeOn(s)
    .doOnNext(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
    .subscribe()