RxJava + 改造,获取列表并为每个项目添加额外信息

RxJava + retrofit, get a List and add extra info for each item

我正在玩 RXJava,在 Android 中进行改造。我正在努力实现以下目标:

我需要定期轮询一个给我 Observable 的调用>(从这里我可以做到)

一旦我得到这个列表,我想在每个交付中迭代并调用另一个方法来给我预计到达时间(所以只是更多信息)我想将这个新信息附加到交付中并返回完整列表每个项目附加的额外信息。

一旦我得到列表,我知道如何在没有 rxjava 的情况下做到这一点,但我想练习一下。

到目前为止,这是我的代码:

pollDeliveries = Observable.interval(POLLING_INTERVAL, TimeUnit.SECONDS, Schedulers.from(AsyncTask.THREAD_POOL_EXECUTOR))
            .map(tick -> RestClient.getInstance().getApiService().getDeliveries())
                    .doOnError(err -> Log.e("MPB", "Error retrieving messages" + err))
                    .retry()
                    .subscribe(deliveries -> {
                        MainApp.getEventBus().postSticky(deliveries);
                    });

这是给我一份交货清单。现在我想完成第二部分。

希望我已经足够清楚了。 谢谢

我没有花很多时间在 Java 8 个 lambda 上,但这里有一个将每个对象映射到不同对象的示例,然后在另一端得到一个 List<...>普通 ol' Java 7:

List<Delivery> deliveries = ...;
Observable.from(deliveries).flatMap(new Func1<Delivery, Observable<ETA>>() {
    @Override
    public Observable<ETA> call(Delivery delivery) {
        // Convert delivery to ETA...
        return someEta;
    }
})
.toList().subscribe(new Action1<List<ETA>>() {
    @Override
    public void call(List<ETA> etas) {

    }
});

当然,最好采用 Retrofit 响应(大概是 Observable<List<Delivery>>?)并观察其中的每一个。为此,我们理想地使用 flatten() 之类的东西,其中 doesn't appear to be coming to RxJava anytime soon.

为此,您可以改为执行类似的操作(使用 lambda 会更好)。您将上面示例中的 Observable.from(deliveries) 替换为以下内容:

apiService.getDeliveries().flatMap(new Func1<List<Delivery>, Observable<Delivery>>() {
    @Override
    public Observable<Delivery> call(List<Delivery> deliveries) {
        return Observable.from(deliveries);
    }
}).flatMap(...)

最后我找到了一个很好的方法。

private void startPolling() {
    pollDeliveries = Observable.interval(POLLING_INTERVAL, TimeUnit.SECONDS, Schedulers.from(AsyncTask.THREAD_POOL_EXECUTOR))
            .flatMap(tick -> getDeliveriesObs())
            .doOnError(err -> Log.e("MPB", "Error retrieving messages" + err))
            .retry()
            .subscribe(this::parseDeliveries, Throwable::printStackTrace);
}

private Observable<List<Delivery>> getDeliveriesObs() {
    return RestClient.getInstance().getApiService().getDeliveries()
            .flatMap(Observable::from)
            .flatMap(this::getETAForDelivery)
            .toSortedList((d1, d2) -> {
                if (d1.getEta() == null) {
                    return -1;
                }
                if (d2.getEta() == null) {
                    return 1;
                }
                return d1.getEta().getDuration().getValue() > d2.getEta().getDuration().getValue() ? 1 : -1;
            });
}

让我们一步一步来。

  1. 首先,我们创建一个 Observable,它每 POLLING_INTERVAL 次触发方法 getDeliveriesObs() 将 return 最终列表
  2. 我们使用 retrofit 来获取调用的 Observable
  3. 我们使用 flatMap 来扁平化结果列表,并在下一个平面图中得到一个 Delivery 项目,一个接一个。
  4. 然后我们得到 Delivery 对象中设置的预计到达时间 return 它
  5. 我们按预计到达时间对列表进行排序。
  6. 如果出现错误,我们打印并重试,这样间隔就不会停止
  7. 我们最终订阅以对列表进行排序并在其中包含预计到达时间,然后我们就 return 它或您需要用它做的任何事情。

它工作正常而且非常好,我开始喜欢 rxjava :)