RxJava 异步订阅

RxJava async subscription

我有一个任务列表,应该在一个新线程中一个一个地处理,然后结果应该由某个主线程在一个方法中显示。 但是这似乎不起作用,flatMap 方法在主线程中被调用。

在这种情况下,为什么 subscribeOn 方法不处理 "thread switch"?

在另一个线程中执行某些工作的更好模式是什么? (除了使用 Observable.create 并手动创建一个新线程,这非常冗长)

List<Task> tasks = ...;
Observable.from(tasks)
          .flatMap(task -> {
                // should be handled in a new thread
                try {
                    return Observable.just(task.call());
                } catch (Exception e) {
                    log.error("Error", e);
                }

                return Observable.empty();
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(MySchedulers.main())
            .subscribe(this::show); // subscribe called from main thread

警告:我不是 Java 程序员,而是 C#,所以所有奇怪的驼峰式方法名称都让我感到害怕和困惑。

如果您想为 flatMap 操作创建一个新线程,则 subscribeOn 位置错误。将其插入 fromflatMap 之间。有关 subscribeOnobserveOn 的完整解释,请参阅 this answer - 它是为 .NET 编写的,但原理是相同的。

我不熟悉 Java 中的任务,所以我不确定您的 Task 是否像 .NET 的 Task 以及 task.call() 是否是异步的并启动自己的线程 - 我猜不是你的问题,因为你说“......应该在 a 新线程中逐一处理的任务列表”。

A newThread 调度程序使用一个新线程 每个订阅者 - 由于 flatMap 将进行单个订阅,因此所有 task.call 调用都将是在同一个线程上创建,虽然这将来自 from 操作员的线程。

如果 task.call 实际上是异步的,那么结果将返回,但是它引入了并发性并且独立于 Rx 的语义。

无论哪种方式,(正确放置的)observeOn 都会将结果传递给主线程上的 this::show