如何检查当前元素是最后一个元素的运算符?

How do I check in an operator that current element is last element?

上下文: 要处理 Flowable<Item>,我需要先处理第一个 item,然后根据该处理将所有项目累积到一个项目中 (reduce),或者在每个项目上应用一个简单的映射而不任意累积 (map).

我能想到的一种方法要求操作员知道当前元素是最后一个元素。 有没有这样的运算符知道当前元素是否是最后一个元素?我不能使用缓冲区,因为即使不应该进行累加,它也会始终获取 2 个元素。

AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
    flatMap(item -> {
        if(item.shouldBeAccumulated()) {
            //Accumulate data into reference
            itemRef.set(itemRef.get().addData(item.getData()));
            //Return empty to throw away consumed item;
            return Flowable.empty();
        } else {
            item.updateProperty();
            return Flowable.just(item);
        }
    })
    .applyIfLastElement(item -> {
        if (item.shouldBeAccumulated()) {
            return Flowable.just(itemRef.get());
        }
    })

下面是你如何做到的(在 RxJava 2.x 中,它非常接近 RxJava 3.x)。诀窍是使用 defer(为 Flowable 封装状态以便它可以被多次订阅的最佳方式)和 concatWithdefer 还可以在 last 的情况下启用惰性求值。另请注意,作为一项性能改进,您可能并不关心我使用了一个元素数组而不是 AtomicReference 对象(以避免不必要的易失性读取、设置等)。

Flowable<Integer> result = Flowable.defer(() -> {
    boolean[] isFirst = new boolean[] { true };
    Integer[] state = new Integer[1];
    Maybe<Integer> last = Maybe.defer(() -> {
        if (state[0] == null) {
            return Maybe.empty();
        } else {
            return Maybe.just(state[0]);
        }
    });
    return source //
            .flatMap(x -> {
                if (state[0] != null || isFirst[0] && shouldBeAccumulated(x)) {
                        // accumulate
                        state[0] = state[0] == null ? 0 : state[0] + x;
                        isFirst[0] = false;
                        return Flowable.empty();
                    } else {
                        isFirst[0] = false;
                        return Flowable.just(x);
                    }
                })
            .concatWith(last);
    });