RxJava - 用 x of y 计数器

RxJava - counter with x of y

我该怎么做?我想出了以下方法,它允许我计算 y 的 x 中的 x,但是我怎样才能改进它来计算 y?

Observable<LoadedPhoneContact> phoneContacts = getPhoneContacts()
        .flatMapObservable(contacts -> Observable.fromIterable(contacts));

Observable<Integer> phoneContactsCounter = phoneContacts
        .cache()
        .map(contact -> 1)
        .reduce((c1, c2) -> c2 + c2)
        .toObservable();

mSimils = phoneContacts
        .zipWith(phoneContactsCounter, (contact, index) -> new Pair<>(contact, index))
        .doOnNext(data ->  {
            L.d("x / y = %d / %d", data.second, ?);
        })
        .map(data -> SimilUtil.calcSimils(data))
        ;

有什么想法或better/alternative解决方案吗?

我希望能够以 "x of y items processed"...

这样的格式查看进度

我可以用第三个只发出计数的 observable 压缩,但这是正确的方法吗?

这个怎么样:

Observable<?> result = Observable.defer(() -> {
    int[] counter = { 1 };
    return getPhoneContacts()
        .toObservable()
        .flatMapIterable(list -> list, (list, element) -> {
             L.d("x / y = %d / %d", counter[0]++, list.size());
             return SimilUtil.calcSimils(element);
        });
});

我绝对建议不要在 Rx 回调中产生副作用,因为这很容易导致线程同步问题(首先使用 Rx 可以避免这种情况)。

相反,您可以使用具有 .mapWithIndex(...) 运算符的 rxjava-extras