我应该如何将 List<Observable<T>> 压缩成 Observable<List<T>>?

How should I zip a List<Observable<T>> into an Observable<List<T>>?

我有一个 List<Observable<T>>,我想将其转换为 Observable<List<T>>

我知道 Observable.zip,这似乎是正确的功能,但我不确定如何定义 zipper 参数。

这是我尝试过的:

final List<Observable<T>> tasks = getTasks();

final Observable<List<T>> task = Observable.zip(
    tasks, 
    x -> ImmutableList.copyOf(x)
        .stream()
        .map(x -> (T)x)
        .collect(ImmutableList.toImmutableList()));

但是,这需要未经检查的转换。

我应该如何在 RxJava 2 中处理这个问题?


注意 指的是 RxJava 1。

应该这样做

final List<Observable<T>> tasks = getTasks();
List<T> values = getTasks().stream()
                           .map(observable -> observable.firstElement().blockingGet())
                           .collect(Collectors.toList());
Observable<List<T>> result = Observable.fromArray(values);

使用基元:

List<Observable<T>> tasks = getTasks();
Observable<List<T>> task = Observable.merge(tasks).toList();

但是,您真的需要一次完成所有任务吗?您可以跳过 toList 并在任务到来时对其进行处理;这将为您提供更好的响应能力和更轻松的并发控制。