如何在 Java 中创建异步 producer/source?
How to make an asynchronous producer/source in Java?
我有一个任务列表,需要一个方法处理,这个过程需要很长时间,所以我希望列表中的任务以异步方式一个一个地处理,return 结果作为一个异步流,所以下游处理不需要等待整个任务列表的完成:
AsyncStream<R> methodA(List<T> tasks){tasks.forEach(t -> {calculation that takes a long time})}
在网上简单搜索了一下,发现RxJava可以处理异步流数据,但是介绍好像没有解释如何创建异步数据流。那么如何在Java中创建异步producer/source?
您可以创建异步 Observable
,它会在给定任务的计算完成后立即发出值。为此,您需要 flatMap
运算符。在一个简化的例子中,这看起来像:
static Observable<String> methodA(List<String> tasks) {
return Observable.from(tasks)
.flatMap(t -> Observable.just(t)
.map(t1 -> longRunningTask(t1))
.subscribeOn(Schedulers.io())
);
}
static String longRunningTask(String arg) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return arg;
}
您将您的任务映射到 Observable
并使用 subscribeOn
,以便当某些东西订阅它们时,订阅发生在不同的线程上。 flatMap
运算符立即订阅所有这些 Observables
并在它们准备好后立即发出值。计算是异步的,因为订阅发生在 Scedulers.io
池的不同线程中。
我有一个任务列表,需要一个方法处理,这个过程需要很长时间,所以我希望列表中的任务以异步方式一个一个地处理,return 结果作为一个异步流,所以下游处理不需要等待整个任务列表的完成:
AsyncStream<R> methodA(List<T> tasks){tasks.forEach(t -> {calculation that takes a long time})}
在网上简单搜索了一下,发现RxJava可以处理异步流数据,但是介绍好像没有解释如何创建异步数据流。那么如何在Java中创建异步producer/source?
您可以创建异步 Observable
,它会在给定任务的计算完成后立即发出值。为此,您需要 flatMap
运算符。在一个简化的例子中,这看起来像:
static Observable<String> methodA(List<String> tasks) {
return Observable.from(tasks)
.flatMap(t -> Observable.just(t)
.map(t1 -> longRunningTask(t1))
.subscribeOn(Schedulers.io())
);
}
static String longRunningTask(String arg) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return arg;
}
您将您的任务映射到 Observable
并使用 subscribeOn
,以便当某些东西订阅它们时,订阅发生在不同的线程上。 flatMap
运算符立即订阅所有这些 Observables
并在它们准备好后立即发出值。计算是异步的,因为订阅发生在 Scedulers.io
池的不同线程中。