RxJava:并行处理并合并结果(不打乱顺序)
RxJava: process in parallel and combine results (without messing up the order)
我对使用反应流还很陌生,遇到了以下问题,这让我很难解决。 Objective 是从MongoDB 数据库中获取一些文档。对于每个文档,从数据库中获取元数据并从数据库中获取文件(示例代码中还没有)。然后我们需要将所有数据上传到 s3(结合所有三个项目)。但是,我坚持在不弄乱元素顺序的情况下组合不同的发布者。
Publisher<Document> p = versionCollection.find();
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).publish();
Observable<GridFSFile> gridFS = version
.map(extractID())
.flatMap(loadGridFSFile()).toObservable();
Observable c = version.toObservable()
.zipWith(gridFS, (Document v, GridFSFile f) -> {
// if I check here if both messages belong together, the order sometimes is messed up
return v;
});
version.connect();
所以,基本上我试图将事件发布到两个不同的路径,一个路径从 GridFS 获取元数据,然后我尝试再次组合这两个路径(这样我就可以访问初始文档和元数据)。但是,我注意到有时事件会以不同的顺序压缩(可能是因为查询 db 有时需要不同的时间)。
每个事件的执行路径应该是这样的
v
|
/ | \
v query db query db
\ | /
upload aggregate
of all 3 elements
本质上问题是,使用我的方法,我最终得到了对不同元素 v 的较早或较晚查询的结果。我可能需要以某种方式确保执行路径在所有 3 个路径之间同步发生一次输入一个元素,但我不知道如何。
编辑
我终于找到了一种似乎可以满足需要的方法。然而,感觉有点奇怪,并行处理事物并确保它们保持同步似乎很复杂
Publisher<Document> p = versionCollection.find();
Observable<Document> version = Observable.fromPublisher(p);
version.flatMap(v -> {
ConnectableObservable<Document> connectableObservable = Observable.just(v).replay();
Observable o = connectableObservable
.map(extractAudioID())
.flatMap(loadGridFSFile(audioBucket));
Observable o3 = connectableObservable.zipWith(o, (Document a, GridFSFile f) -> {
// now everything seems to stay in order here
// and we can combine both results
});
o3.subscribe();
o.subscribe();
Disposable a = connectableObservable.connect();
return connectableObservable;
}, 1).blockingSubscribe();
static Function<ObjectId, ObservableSource<GridFSFile>> loadGridFSFile(GridFSBucket audioBucket) {
return id -> Observable.fromPublisher(audioBucket.find(new Document("_id", id)).first());
}
一些似乎可以解决问题的事情:
- 从 RxJava2 开始,通常使用 Flowables 而不是 Observables 更好,因为它们可以处理背压 https://blog.kaush.co/2017/06/21/rxjava1-rxjava2-migration-understanding-changes/
- flatMap 本质上不关心顺序,而 concatMap 关心:https://medium.com/appunite-edu-collection/rxjava-flatmap-switchmap-and-concatmap-differences-examples-6d1f3ff88ee0
现在这段代码看起来更合理了:
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).replay();
Flowable<GridFSFile> file = version
.map(extractID())
.concatMap(loadGridFSFile(audioBucket));
Flowable<GridFSDownloadStream> data = version
.map(extractID())
.map(loadGridFSData(audioBucket));
Flowable c = Flowable.zip(version, file, data, (v, f, d) -> {
// so far everything seems to stay in order
return v;
});
version.connect();
c.subscribe();
我对使用反应流还很陌生,遇到了以下问题,这让我很难解决。 Objective 是从MongoDB 数据库中获取一些文档。对于每个文档,从数据库中获取元数据并从数据库中获取文件(示例代码中还没有)。然后我们需要将所有数据上传到 s3(结合所有三个项目)。但是,我坚持在不弄乱元素顺序的情况下组合不同的发布者。
Publisher<Document> p = versionCollection.find();
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).publish();
Observable<GridFSFile> gridFS = version
.map(extractID())
.flatMap(loadGridFSFile()).toObservable();
Observable c = version.toObservable()
.zipWith(gridFS, (Document v, GridFSFile f) -> {
// if I check here if both messages belong together, the order sometimes is messed up
return v;
});
version.connect();
所以,基本上我试图将事件发布到两个不同的路径,一个路径从 GridFS 获取元数据,然后我尝试再次组合这两个路径(这样我就可以访问初始文档和元数据)。但是,我注意到有时事件会以不同的顺序压缩(可能是因为查询 db 有时需要不同的时间)。
每个事件的执行路径应该是这样的
v
|
/ | \
v query db query db
\ | /
upload aggregate
of all 3 elements
本质上问题是,使用我的方法,我最终得到了对不同元素 v 的较早或较晚查询的结果。我可能需要以某种方式确保执行路径在所有 3 个路径之间同步发生一次输入一个元素,但我不知道如何。
编辑
我终于找到了一种似乎可以满足需要的方法。然而,感觉有点奇怪,并行处理事物并确保它们保持同步似乎很复杂
Publisher<Document> p = versionCollection.find();
Observable<Document> version = Observable.fromPublisher(p);
version.flatMap(v -> {
ConnectableObservable<Document> connectableObservable = Observable.just(v).replay();
Observable o = connectableObservable
.map(extractAudioID())
.flatMap(loadGridFSFile(audioBucket));
Observable o3 = connectableObservable.zipWith(o, (Document a, GridFSFile f) -> {
// now everything seems to stay in order here
// and we can combine both results
});
o3.subscribe();
o.subscribe();
Disposable a = connectableObservable.connect();
return connectableObservable;
}, 1).blockingSubscribe();
static Function<ObjectId, ObservableSource<GridFSFile>> loadGridFSFile(GridFSBucket audioBucket) {
return id -> Observable.fromPublisher(audioBucket.find(new Document("_id", id)).first());
}
一些似乎可以解决问题的事情:
- 从 RxJava2 开始,通常使用 Flowables 而不是 Observables 更好,因为它们可以处理背压 https://blog.kaush.co/2017/06/21/rxjava1-rxjava2-migration-understanding-changes/
- flatMap 本质上不关心顺序,而 concatMap 关心:https://medium.com/appunite-edu-collection/rxjava-flatmap-switchmap-and-concatmap-differences-examples-6d1f3ff88ee0
现在这段代码看起来更合理了:
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).replay();
Flowable<GridFSFile> file = version
.map(extractID())
.concatMap(loadGridFSFile(audioBucket));
Flowable<GridFSDownloadStream> data = version
.map(extractID())
.map(loadGridFSData(audioBucket));
Flowable c = Flowable.zip(version, file, data, (v, f, d) -> {
// so far everything seems to stay in order
return v;
});
version.connect();
c.subscribe();