如何在不破坏运算符链的情况下合并发出 Observable 的 Observable 的输出?
How to merge the output of an Observable that emits Observables without breaking the operator chain?
上下文正在使用 Couchbase 在 2 级文档存储上实现 REST CRUD 服务。数据模型是指向零个或多个项目文档的索引文档。使用异步获取将索引文档作为 Observable 进行检索。接下来是 .flatMap() ,它为每个项目文档检索零个或多个 ID。 async get returns 一个 Observable,所以现在我创建的 Observable 是 Observable>。我想链接一个 .merge() 运算符,它将采用 "an Observable that emits Observables, and will merge their output into the output of a single Observable" 来引用 ReactiveX 文档 :) 然后我将 .subscribe() 到那个单一的 Observable 以检索项目文档。 .merge() 运算符有很多签名,但我不知道如何在如下运算符链中使用它:
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return items;
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.merge( ???????? )
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
编辑:
您可能猜到我是一个被动的新手。 @akarnokd 的回答让我意识到我试图做的事情是愚蠢的。解决方案是将 Observable<Observable<JsonDocument>>
项的排放合并到 document
闭包和 return 的结果中。这从 flatMap
:
发出结果 JsonDocuments
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return Observable.merge(items);
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
已测试并有效:)
由于 Java 的表达限制,我们不能有可应用于 Observble<Observable<T>>
的无参数 merge()
运算符。它需要扩展方法,例如 C#。
下一个最好的事情是做一个身份flatMap
:
// ...
.flatMap(document -> ...)
.flatMap(v -> v)
.subscribe(...)
您可以调用 toList()
将所有发出的项目收集到一个列表中。我没有测试过,但是像这样的东西怎么样:
bucket.async()
.get(id)
.flatmap(document -> { return Observable.from((JsonArray)document.content().get("item")})
.flatMap(bucket::get)
.toList()
.subscribe(results -> /* list of documents */);
上下文正在使用 Couchbase 在 2 级文档存储上实现 REST CRUD 服务。数据模型是指向零个或多个项目文档的索引文档。使用异步获取将索引文档作为 Observable 进行检索。接下来是 .flatMap() ,它为每个项目文档检索零个或多个 ID。 async get returns 一个 Observable,所以现在我创建的 Observable 是 Observable>。我想链接一个 .merge() 运算符,它将采用 "an Observable that emits Observables, and will merge their output into the output of a single Observable" 来引用 ReactiveX 文档 :) 然后我将 .subscribe() 到那个单一的 Observable 以检索项目文档。 .merge() 运算符有很多签名,但我不知道如何在如下运算符链中使用它:
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return items;
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.merge( ???????? )
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
编辑:
您可能猜到我是一个被动的新手。 @akarnokd 的回答让我意识到我试图做的事情是愚蠢的。解决方案是将 Observable<Observable<JsonDocument>>
项的排放合并到 document
闭包和 return 的结果中。这从 flatMap
:
JsonDocuments
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return Observable.merge(items);
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
已测试并有效:)
由于 Java 的表达限制,我们不能有可应用于 Observble<Observable<T>>
的无参数 merge()
运算符。它需要扩展方法,例如 C#。
下一个最好的事情是做一个身份flatMap
:
// ...
.flatMap(document -> ...)
.flatMap(v -> v)
.subscribe(...)
您可以调用 toList()
将所有发出的项目收集到一个列表中。我没有测试过,但是像这样的东西怎么样:
bucket.async()
.get(id)
.flatmap(document -> { return Observable.from((JsonArray)document.content().get("item")})
.flatMap(bucket::get)
.toList()
.subscribe(results -> /* list of documents */);