Rx Observables:为每个原始项目发出额外的项目,将它们减少到另一种类型,消费

Rx Observables: emit additional item for each original item, reduce them to another type, consume

我在使用 Couchbase Java 客户端 2.2.2 和 Rx Observables 1.0.15 执行以下操作时遇到问题:

到目前为止我想出的东西看起来很有意义:

List<E> resultList = new ArrayList<>();

Observable
    .from(originalDocumentNames)
    .flatmap(key -> {
        Observable firstDocument = bucket.async().get(key);
        Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
        return Observable.merge(firstDocument, secondDocument);
    })
    .reduce((jsonDocument1, jsonDocument2) -> {
        if (jsonDocument1 == null || jsonDocument2 == null) {
            return null;
        }
        resultList.add(createCustomObject(jsonDocument1, jsonDocument2);
        return null;
    })
    .filter(Objects.nonNull)
    .singleOrDefault(null)
    .subscribe(new Subscriber<E>() {
        public void onComplete() {
            //use resultList in a callback function
        }
    });

这不起作用。我不知道在哪里,但我想我使用 Observable.merge 的方式不对。 另外我认为我正在以错误的方式处理整个问题。

所以看起来主要的问题是:

您可以在zip 平面图中使用。 Zip 将发出与 Observable 数量最少的项目一样多的项目。因此,如果其中一个文档丢失,其序列将为空,zip 将跳过它。

Observable
.from(originalDocumentNames)
.flatmap(key -> {
    //the stream of 0-1 original document
    Observable firstDocument = bucket.async().get(key);
    //the stream of 0-1 associated document
    Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));

    //using zip and the createCustomObject method reference as a zip function to combine pairs of documents
    return Observable.zip(firstDocument, secondDocument, this::createCustomObject);
})
.toList() //let RxJava aggregate into a List
.subscribe(
    //the "callback" function, onNext will be called only once with toList
    list -> doSomething(list), 
    //always try to define onError (best practice)
    e -> processErrors(e)
);

此代码中存在几个问题:

  1. 副作用,reduce 操作正在添加到 Observable 链之外的列表,这是错误的。 reduce 应该是 return 列表或者根本不存在,因为 Rx 有一个 toList 操作。也因为 returns null 的 reduce 操作,下一个操作必须处理它;这有点不雅。

  2. merge 操作错误,您应该在 flatmap 中改为 zip 并构建 pair/aggregate.

  3. 可选点:flatmap 操作不处理如果任何一个 get 操作将 return 多个项目(也许是 de facto 沙发底座的情况)

请注意,我没有 IDE,所以暂时没有代码。但在我看来,用 zip 替换 merge 并删除 reduce 肯定会有帮助。