RxJava 平面图链接请求
RxJava flatmap chaining requests
我正在将 Retrofit 与 RxJAva 用于获取 Rss Feed 的应用程序,但 rss 不包含所有信息,因此我使用 jsoup 解析每个项目 link,以检索图像和文章的描述。现在我这样使用它:
public Observable<Rss> getDumpData() {
return newsAppService.getDumpData()
.flatMap(rss -> Observable.from(rss.channel.items)
.observeOn(Schedulers.io())
.flatMap(Checked.f1(item -> Observable.just(Jsoup.connect(item.link).get())
.observeOn(Schedulers.io())
.map(document -> document.select("div[itemprop=image] > img").first())
.doOnNext(element -> item.image = element.attr("src"))
)))
.defaultIfEmpty(rss)
.ignoreElements()
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread());
}
我在这一行收到一个错误:defaultIfEmpty(rss)
它不识别平面图的 rss。当我将 defaultIfEmpty(rss)
移动到平面图括号中时,出现另一个错误,指出必须将 return 类型更改为 Element。他们有什么解决办法吗?
您不能将一个 RxJava
参数(flatMap
lambda 参数)的内部参数与另一个运算符参数(defaultIfEmpty
)混合使用。
首先,创建一个辅助函数来保持主反应流更干净:
private Observable<List<Item>> getDetails(List<Item> items) {
return Observable.from(items)
.observeOn(Schedulers.io())
.flatMap(Checked.f1(item ->
Observable.zip(
Observable.just(item),
Observable.just(Jsoup.connect(item.link).get())
.observeOn(Schedulers.io())
.map(document -> document.select("div[itemprop=image] > img").first()),
(itemInner, element) -> {
itemInner.image = element.attr("src");
return itemInner;
}
)
))
.toList();
}
然后重新格式化主函数:
newsAppService.getDumpData()
.flatMap(rss ->
Observable.zip(
Observable.<Rss>just(rss),
getDetails(rss.channel.items),
(rssInner, items) -> {
rssInner.channel.items = items;
return rss;
}).onErrorResumeNext((throwable -> Observable.just(rss))
)
)
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread());
希望我能正确瞄准你的目标。它可能行不通,因为我无法对其进行测试,但我希望你能理解。我使用 .zip
的原因是您不能丢失对当前解析的 item
或 rss
的引用
首先你需要摆脱所有与 observeOn 的并发并使用 subscribeOn。
.observeOn(Schedulers.io())
如果想将另一个线程的数据同步回事件循环,请考虑将 observeOn 与 AndroidScheduler 一起使用。通常,您会在订阅可观察对象之前使用 observeOn,以便同步回 ui-loop 并更改 ui-information.
.observeOn(AndroidSchedulers.mainThread())
其次,不建议改变管道中的对象。你应该 return 一个新的对象。
.doOnNext(element -> item.image = element.attr("src"))
考虑到前两点,我尝试重构您的解决方案。我正在使用 RxJava2-RC5
flatMap 运算符有很多重载。其中之一提供了将传入值和创建值压缩在一起的功能。
Observable<Rss> rssItemObservable = newsService.getDumpData()
.flatMap(rss -> getRssItemInformation(rss).subscribeOn(Schedulers.io()),
(r, rItemList) -> {
Rss rInterim = new Rss();
rInterim.items = rItemList;
return rInterim;
});
用于检索 Rss 中每个项目的信息的帮助方法。请考虑使用 maxConcurrency 的重载,因为默认情况下它会立即订阅每个流。因此 flatMap 会创建许多 http 请求。
private Observable<List<RssItem>> getRssItemInformation(Rss rss) {
return Observable.fromIterable(rss.items)
.flatMap(rssItem -> getImageUrl(rssItem).subscribeOn(Schedulers.io()), (rItem, img) -> {
RssItem item = new RssItem();
printCurrentThread("merge1");
item.image = img;
item.link = rItem.link;
return item;
}).toList().toObservable();
}
检索图像的帮助方法url。返回 observable 与并发无关。如果发生错误,空字符串将被 returned 作为默认值。
private Observable<String> getImageUrl(String link) {
return Observable.fromCallable(() -> Jsoup.connect(link).get())
.map(document -> document.select("div[itemprop=image] > img").first())
.map(element -> element.attr("src"))
.onErrorResumeNext(throwable -> {
return Observable.just("");
});
}
您可以在 github.gist 查看完整示例:https://gist.github.com/anonymous/a8e36205fc2430517c66c802f6eef38e
我正在将 Retrofit 与 RxJAva 用于获取 Rss Feed 的应用程序,但 rss 不包含所有信息,因此我使用 jsoup 解析每个项目 link,以检索图像和文章的描述。现在我这样使用它:
public Observable<Rss> getDumpData() {
return newsAppService.getDumpData()
.flatMap(rss -> Observable.from(rss.channel.items)
.observeOn(Schedulers.io())
.flatMap(Checked.f1(item -> Observable.just(Jsoup.connect(item.link).get())
.observeOn(Schedulers.io())
.map(document -> document.select("div[itemprop=image] > img").first())
.doOnNext(element -> item.image = element.attr("src"))
)))
.defaultIfEmpty(rss)
.ignoreElements()
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread());
}
我在这一行收到一个错误:defaultIfEmpty(rss)
它不识别平面图的 rss。当我将 defaultIfEmpty(rss)
移动到平面图括号中时,出现另一个错误,指出必须将 return 类型更改为 Element。他们有什么解决办法吗?
您不能将一个 RxJava
参数(flatMap
lambda 参数)的内部参数与另一个运算符参数(defaultIfEmpty
)混合使用。
首先,创建一个辅助函数来保持主反应流更干净:
private Observable<List<Item>> getDetails(List<Item> items) {
return Observable.from(items)
.observeOn(Schedulers.io())
.flatMap(Checked.f1(item ->
Observable.zip(
Observable.just(item),
Observable.just(Jsoup.connect(item.link).get())
.observeOn(Schedulers.io())
.map(document -> document.select("div[itemprop=image] > img").first()),
(itemInner, element) -> {
itemInner.image = element.attr("src");
return itemInner;
}
)
))
.toList();
}
然后重新格式化主函数:
newsAppService.getDumpData()
.flatMap(rss ->
Observable.zip(
Observable.<Rss>just(rss),
getDetails(rss.channel.items),
(rssInner, items) -> {
rssInner.channel.items = items;
return rss;
}).onErrorResumeNext((throwable -> Observable.just(rss))
)
)
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread());
希望我能正确瞄准你的目标。它可能行不通,因为我无法对其进行测试,但我希望你能理解。我使用 .zip
的原因是您不能丢失对当前解析的 item
或 rss
首先你需要摆脱所有与 observeOn 的并发并使用 subscribeOn。
.observeOn(Schedulers.io())
如果想将另一个线程的数据同步回事件循环,请考虑将 observeOn 与 AndroidScheduler 一起使用。通常,您会在订阅可观察对象之前使用 observeOn,以便同步回 ui-loop 并更改 ui-information.
.observeOn(AndroidSchedulers.mainThread())
其次,不建议改变管道中的对象。你应该 return 一个新的对象。
.doOnNext(element -> item.image = element.attr("src"))
考虑到前两点,我尝试重构您的解决方案。我正在使用 RxJava2-RC5
flatMap 运算符有很多重载。其中之一提供了将传入值和创建值压缩在一起的功能。
Observable<Rss> rssItemObservable = newsService.getDumpData()
.flatMap(rss -> getRssItemInformation(rss).subscribeOn(Schedulers.io()),
(r, rItemList) -> {
Rss rInterim = new Rss();
rInterim.items = rItemList;
return rInterim;
});
用于检索 Rss 中每个项目的信息的帮助方法。请考虑使用 maxConcurrency 的重载,因为默认情况下它会立即订阅每个流。因此 flatMap 会创建许多 http 请求。
private Observable<List<RssItem>> getRssItemInformation(Rss rss) {
return Observable.fromIterable(rss.items)
.flatMap(rssItem -> getImageUrl(rssItem).subscribeOn(Schedulers.io()), (rItem, img) -> {
RssItem item = new RssItem();
printCurrentThread("merge1");
item.image = img;
item.link = rItem.link;
return item;
}).toList().toObservable();
}
检索图像的帮助方法url。返回 observable 与并发无关。如果发生错误,空字符串将被 returned 作为默认值。
private Observable<String> getImageUrl(String link) {
return Observable.fromCallable(() -> Jsoup.connect(link).get())
.map(document -> document.select("div[itemprop=image] > img").first())
.map(element -> element.attr("src"))
.onErrorResumeNext(throwable -> {
return Observable.just("");
});
}
您可以在 github.gist 查看完整示例:https://gist.github.com/anonymous/a8e36205fc2430517c66c802f6eef38e