RxJava 平面图链接请求

RxJava flatmap chaining requests

我正在将 Retrofit 与 RxJAva 用于获取 Rss Feed 的应用程序,但 rss 不包含所有信息,因此我使用 jsoup 解析每个项目 link,以检索图像和文章的描述。现在我这样使用它:

public Observable<Rss> getDumpData() {
    return newsAppService.getDumpData()
            .flatMap(rss -> Observable.from(rss.channel.items)
            .observeOn(Schedulers.io())
            .flatMap(Checked.f1(item -> Observable.just(Jsoup.connect(item.link).get())
            .observeOn(Schedulers.io())
            .map(document -> document.select("div[itemprop=image] > img").first())
                    .doOnNext(element -> item.image = element.attr("src"))
            )))
            .defaultIfEmpty(rss)
            .ignoreElements()
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread());
}

我在这一行收到一个错误:defaultIfEmpty(rss) 它不识别平面图的 rss。当我将 defaultIfEmpty(rss) 移动到平面图括号中时,出现另一个错误,指出必须将 return 类型更改为 Element。他们有什么解决办法吗?

您不能将一个 RxJava 参数(flatMap lambda 参数)的内部参数与另一个运算符参数(defaultIfEmpty)混合使用。

首先,创建一个辅助函数来保持主反应流更干净:

private Observable<List<Item>> getDetails(List<Item> items) {
    return Observable.from(items)
               .observeOn(Schedulers.io())
               .flatMap(Checked.f1(item ->
                   Observable.zip(
                       Observable.just(item),
                            Observable.just(Jsoup.connect(item.link).get())
                           .observeOn(Schedulers.io())
                           .map(document -> document.select("div[itemprop=image] > img").first()),
                           (itemInner, element) -> {
                                itemInner.image = element.attr("src");
                                return itemInner;
                           }
                   )
               ))
               .toList();
}

然后重新格式化主函数:

newsAppService.getDumpData()
    .flatMap(rss ->
        Observable.zip(
            Observable.<Rss>just(rss),
            getDetails(rss.channel.items),
            (rssInner, items) -> {
                rssInner.channel.items = items;
                return rss;
            }).onErrorResumeNext((throwable -> Observable.just(rss))
        )
    )
    .observeOn(Schedulers.io())
    .subscribeOn(AndroidSchedulers.mainThread());

希望我能正确瞄准你的目标。它可能行不通,因为我无法对其进行测试,但我希望你能理解。我使用 .zip 的原因是您不能丢失对当前解析的 itemrss

的引用

首先你需要摆脱所有与 observeOn 的并发并使用 subscribeOn。

.observeOn(Schedulers.io())

如果想将另一个线程的数据同步回事件循环,请考虑将 observeOn 与 AndroidScheduler 一起使用。通常,您会在订阅可观察对象之前使用 observeOn,以便同步回 ui-loop 并更改 ui-information.

.observeOn(AndroidSchedulers.mainThread())

其次,不建议改变管道中的对象。你应该 return 一个新的对象。

.doOnNext(element -> item.image = element.attr("src"))

考虑到前两点,我尝试重构您的解决方案。我正在使用 RxJava2-RC5

flatMap 运算符有很多重载。其中之一提供了将传入值和创建值压缩在一起的功能。

Observable<Rss> rssItemObservable = newsService.getDumpData()
                .flatMap(rss -> getRssItemInformation(rss).subscribeOn(Schedulers.io()),
                        (r, rItemList) -> {
                            Rss rInterim = new Rss();
                            rInterim.items = rItemList;
                            return rInterim;
                        });

用于检索 Rss 中每个项目的信息的帮助方法。请考虑使用 maxConcurrency 的重载,因为默认情况下它会立即订阅每个流。因此 flatMap 会创建许多 http 请求。

private Observable<List<RssItem>> getRssItemInformation(Rss rss) {
        return Observable.fromIterable(rss.items)
                .flatMap(rssItem -> getImageUrl(rssItem).subscribeOn(Schedulers.io()), (rItem, img) -> {
                    RssItem item = new RssItem();
                    printCurrentThread("merge1");
                    item.image = img;
                    item.link = rItem.link;
                    return item;
                }).toList().toObservable();
}

检索图像的帮助方法url。返回 observable 与并发无关。如果发生错误,空字符串将被 returned 作为默认值。

private Observable<String> getImageUrl(String link) {
           return Observable.fromCallable(() -> Jsoup.connect(link).get())
                .map(document -> document.select("div[itemprop=image] > img").first())
                .map(element -> element.attr("src"))
                .onErrorResumeNext(throwable -> {
                    return Observable.just("");
                });
}

您可以在 github.gist 查看完整示例:https://gist.github.com/anonymous/a8e36205fc2430517c66c802f6eef38e