如何在 RxScala 中使用自定义调度器?
How to use custom Scheduler in RxScala?
我尝试
val executors = Executors.newSingleThreadExecutor()
val scheduler = Schedulers.from(executors)
Observable.just[Int](1,2,3).subscribeOn(scheduler)
输出一个错误
Error:(103, 43) type mismatch;
found : rx.Scheduler
required: rx.lang.scala.Scheduler
Observable.just[Int](1,2,3).subscribeOn(scheduler)
^
如何在RxScala
中使用自定义调度程序?
这里的问题是你在这里混合了 RxJava 和 RxScala 代码。你看,RxScala 只是 RxJava 功能的包装器;前者只转发给后者,没有任何 'real' 实现。这很有用,因为您只需要维护 1 个版本而不是 2 个或更多版本。
你的例子中 scheduler
的类型是 rx.Scheduler
,所以它是一个 RxJava Scheduler
。但是,subscribeOn
要求您提供一个 rx.lang.scala.Scheduler
,这是一个 RxScala Scheduler
。因此,您需要将 RxJava Scheduler
转换为 RxScala.
但是,对于您的情况,有更好的方法:使用 fromExecutor
工厂方法将 Executors.newSingleThreadExecutor
包装到 scala.concurrent.ExecutionContext
中。然后将其包装到 rx.lang.scala.schedulers.ExecutionContextScheduler
中,您就拥有了可以在 subscribeOn
中使用的调度程序。你的代码看起来像这样(我包含了一个打印语句来查看这些东西在哪个线程上运行):
val e = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val s = ExecutionContextScheduler(e)
Observable.just(1, 2, 3)
.subscribeOn(s)
.doOnNext(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
.subscribe()
我尝试
val executors = Executors.newSingleThreadExecutor()
val scheduler = Schedulers.from(executors)
Observable.just[Int](1,2,3).subscribeOn(scheduler)
输出一个错误
Error:(103, 43) type mismatch;
found : rx.Scheduler
required: rx.lang.scala.Scheduler
Observable.just[Int](1,2,3).subscribeOn(scheduler)
^
如何在RxScala
中使用自定义调度程序?
这里的问题是你在这里混合了 RxJava 和 RxScala 代码。你看,RxScala 只是 RxJava 功能的包装器;前者只转发给后者,没有任何 'real' 实现。这很有用,因为您只需要维护 1 个版本而不是 2 个或更多版本。
你的例子中 scheduler
的类型是 rx.Scheduler
,所以它是一个 RxJava Scheduler
。但是,subscribeOn
要求您提供一个 rx.lang.scala.Scheduler
,这是一个 RxScala Scheduler
。因此,您需要将 RxJava Scheduler
转换为 RxScala.
但是,对于您的情况,有更好的方法:使用 fromExecutor
工厂方法将 Executors.newSingleThreadExecutor
包装到 scala.concurrent.ExecutionContext
中。然后将其包装到 rx.lang.scala.schedulers.ExecutionContextScheduler
中,您就拥有了可以在 subscribeOn
中使用的调度程序。你的代码看起来像这样(我包含了一个打印语句来查看这些东西在哪个线程上运行):
val e = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val s = ExecutionContextScheduler(e)
Observable.just(1, 2, 3)
.subscribeOn(s)
.doOnNext(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
.subscribe()