Rx Observables:为每个原始项目发出额外的项目,将它们减少到另一种类型,消费
Rx Observables: emit additional item for each original item, reduce them to another type, consume
我在使用 Couchbase Java 客户端 2.2.2 和 Rx Observables 1.0.15 执行以下操作时遇到问题:
- 我有一个字符串列表,它们是文档名称
- 连同每个原始文档的文档名称,我想加载另一个文档(从原始文档名称推导出来),这样我就会得到一对文档。如果这两个文档中的任何一个不存在,请不要再使用这对文档。
- 如果该对有效(即两个文档都存在),则使用这两个文档从中创建自定义对象
- 将那些转换后的项目组合成一个列表
到目前为止我想出的东西看起来很有意义:
List<E> resultList = new ArrayList<>();
Observable
.from(originalDocumentNames)
.flatmap(key -> {
Observable firstDocument = bucket.async().get(key);
Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
return Observable.merge(firstDocument, secondDocument);
})
.reduce((jsonDocument1, jsonDocument2) -> {
if (jsonDocument1 == null || jsonDocument2 == null) {
return null;
}
resultList.add(createCustomObject(jsonDocument1, jsonDocument2);
return null;
})
.filter(Objects.nonNull)
.singleOrDefault(null)
.subscribe(new Subscriber<E>() {
public void onComplete() {
//use resultList in a callback function
}
});
这不起作用。我不知道在哪里,但我想我使用 Observable.merge
的方式不对。
另外我认为我正在以错误的方式处理整个问题。
所以看起来主要的问题是:
- 如何向 Observable 流发送额外的项目?
- 如何将两个项目缩减为另一种类型的项目? (reduce(T, T, T) 不允许这样)
- 我是不是看错了?
您可以在zip
平面图中使用。 Zip 将发出与 Observable
数量最少的项目一样多的项目。因此,如果其中一个文档丢失,其序列将为空,zip 将跳过它。
Observable
.from(originalDocumentNames)
.flatmap(key -> {
//the stream of 0-1 original document
Observable firstDocument = bucket.async().get(key);
//the stream of 0-1 associated document
Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
//using zip and the createCustomObject method reference as a zip function to combine pairs of documents
return Observable.zip(firstDocument, secondDocument, this::createCustomObject);
})
.toList() //let RxJava aggregate into a List
.subscribe(
//the "callback" function, onNext will be called only once with toList
list -> doSomething(list),
//always try to define onError (best practice)
e -> processErrors(e)
);
此代码中存在几个问题:
副作用,reduce
操作正在添加到 Observable
链之外的列表,这是错误的。 reduce
应该是 return 列表或者根本不存在,因为 Rx 有一个 toList
操作。也因为 returns null
的 reduce 操作,下一个操作必须处理它;这有点不雅。
merge
操作错误,您应该在 flatmap
中改为 zip
并构建 pair/aggregate.
可选点:flatmap 操作不处理如果任何一个 get 操作将 return 多个项目(也许是 de facto 沙发底座的情况)
请注意,我没有 IDE,所以暂时没有代码。但在我看来,用 zip
替换 merge
并删除 reduce
肯定会有帮助。
我在使用 Couchbase Java 客户端 2.2.2 和 Rx Observables 1.0.15 执行以下操作时遇到问题:
- 我有一个字符串列表,它们是文档名称
- 连同每个原始文档的文档名称,我想加载另一个文档(从原始文档名称推导出来),这样我就会得到一对文档。如果这两个文档中的任何一个不存在,请不要再使用这对文档。
- 如果该对有效(即两个文档都存在),则使用这两个文档从中创建自定义对象
- 将那些转换后的项目组合成一个列表
到目前为止我想出的东西看起来很有意义:
List<E> resultList = new ArrayList<>();
Observable
.from(originalDocumentNames)
.flatmap(key -> {
Observable firstDocument = bucket.async().get(key);
Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
return Observable.merge(firstDocument, secondDocument);
})
.reduce((jsonDocument1, jsonDocument2) -> {
if (jsonDocument1 == null || jsonDocument2 == null) {
return null;
}
resultList.add(createCustomObject(jsonDocument1, jsonDocument2);
return null;
})
.filter(Objects.nonNull)
.singleOrDefault(null)
.subscribe(new Subscriber<E>() {
public void onComplete() {
//use resultList in a callback function
}
});
这不起作用。我不知道在哪里,但我想我使用 Observable.merge
的方式不对。
另外我认为我正在以错误的方式处理整个问题。
所以看起来主要的问题是:
- 如何向 Observable 流发送额外的项目?
- 如何将两个项目缩减为另一种类型的项目? (reduce(T, T, T) 不允许这样)
- 我是不是看错了?
您可以在zip
平面图中使用。 Zip 将发出与 Observable
数量最少的项目一样多的项目。因此,如果其中一个文档丢失,其序列将为空,zip 将跳过它。
Observable
.from(originalDocumentNames)
.flatmap(key -> {
//the stream of 0-1 original document
Observable firstDocument = bucket.async().get(key);
//the stream of 0-1 associated document
Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
//using zip and the createCustomObject method reference as a zip function to combine pairs of documents
return Observable.zip(firstDocument, secondDocument, this::createCustomObject);
})
.toList() //let RxJava aggregate into a List
.subscribe(
//the "callback" function, onNext will be called only once with toList
list -> doSomething(list),
//always try to define onError (best practice)
e -> processErrors(e)
);
此代码中存在几个问题:
副作用,
reduce
操作正在添加到Observable
链之外的列表,这是错误的。reduce
应该是 return 列表或者根本不存在,因为 Rx 有一个toList
操作。也因为 returnsnull
的 reduce 操作,下一个操作必须处理它;这有点不雅。merge
操作错误,您应该在flatmap
中改为zip
并构建 pair/aggregate.可选点:flatmap 操作不处理如果任何一个 get 操作将 return 多个项目(也许是 de facto 沙发底座的情况)
请注意,我没有 IDE,所以暂时没有代码。但在我看来,用 zip
替换 merge
并删除 reduce
肯定会有帮助。