RxJava 异步订阅
RxJava async subscription
我有一个任务列表,应该在一个新线程中一个一个地处理,然后结果应该由某个主线程在一个方法中显示。
但是这似乎不起作用,flatMap
方法在主线程中被调用。
在这种情况下,为什么 subscribeOn
方法不处理 "thread switch"?
在另一个线程中执行某些工作的更好模式是什么? (除了使用 Observable.create
并手动创建一个新线程,这非常冗长)
List<Task> tasks = ...;
Observable.from(tasks)
.flatMap(task -> {
// should be handled in a new thread
try {
return Observable.just(task.call());
} catch (Exception e) {
log.error("Error", e);
}
return Observable.empty();
})
.subscribeOn(Schedulers.newThread())
.observeOn(MySchedulers.main())
.subscribe(this::show); // subscribe called from main thread
警告:我不是 Java 程序员,而是 C#,所以所有奇怪的驼峰式方法名称都让我感到害怕和困惑。
如果您想为 flatMap
操作创建一个新线程,则 subscribeOn
位置错误。将其插入 from
和 flatMap
之间。有关 subscribeOn
和 observeOn
的完整解释,请参阅 this answer - 它是为 .NET 编写的,但原理是相同的。
我不熟悉 Java 中的任务,所以我不确定您的 Task
是否像 .NET 的 Task
以及 task.call()
是否是异步的并启动自己的线程 - 我猜不是你的问题,因为你说“......应该在 a 新线程中逐一处理的任务列表”。
A newThread
调度程序使用一个新线程 每个订阅者 - 由于 flatMap
将进行单个订阅,因此所有 task.call
调用都将是在同一个线程上创建,虽然这将来自 from
操作员的线程。
如果 task.call
实际上是异步的,那么结果将返回,但是它引入了并发性并且独立于 Rx 的语义。
无论哪种方式,(正确放置的)observeOn
都会将结果传递给主线程上的 this::show
。
我有一个任务列表,应该在一个新线程中一个一个地处理,然后结果应该由某个主线程在一个方法中显示。
但是这似乎不起作用,flatMap
方法在主线程中被调用。
在这种情况下,为什么 subscribeOn
方法不处理 "thread switch"?
在另一个线程中执行某些工作的更好模式是什么? (除了使用 Observable.create
并手动创建一个新线程,这非常冗长)
List<Task> tasks = ...;
Observable.from(tasks)
.flatMap(task -> {
// should be handled in a new thread
try {
return Observable.just(task.call());
} catch (Exception e) {
log.error("Error", e);
}
return Observable.empty();
})
.subscribeOn(Schedulers.newThread())
.observeOn(MySchedulers.main())
.subscribe(this::show); // subscribe called from main thread
警告:我不是 Java 程序员,而是 C#,所以所有奇怪的驼峰式方法名称都让我感到害怕和困惑。
如果您想为 flatMap
操作创建一个新线程,则 subscribeOn
位置错误。将其插入 from
和 flatMap
之间。有关 subscribeOn
和 observeOn
的完整解释,请参阅 this answer - 它是为 .NET 编写的,但原理是相同的。
我不熟悉 Java 中的任务,所以我不确定您的 Task
是否像 .NET 的 Task
以及 task.call()
是否是异步的并启动自己的线程 - 我猜不是你的问题,因为你说“......应该在 a 新线程中逐一处理的任务列表”。
A newThread
调度程序使用一个新线程 每个订阅者 - 由于 flatMap
将进行单个订阅,因此所有 task.call
调用都将是在同一个线程上创建,虽然这将来自 from
操作员的线程。
如果 task.call
实际上是异步的,那么结果将返回,但是它引入了并发性并且独立于 Rx 的语义。
无论哪种方式,(正确放置的)observeOn
都会将结果传递给主线程上的 this::show
。