如何检查当前元素是最后一个元素的运算符?
How do I check in an operator that current element is last element?
上下文:
要处理 Flowable<Item>
,我需要先处理第一个 item
,然后根据该处理将所有项目累积到一个项目中 (reduce
),或者在每个项目上应用一个简单的映射而不任意累积 (map
).
我能想到的一种方法要求操作员知道当前元素是最后一个元素。
有没有这样的运算符知道当前元素是否是最后一个元素?我不能使用缓冲区,因为即使不应该进行累加,它也会始终获取 2 个元素。
AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
flatMap(item -> {
if(item.shouldBeAccumulated()) {
//Accumulate data into reference
itemRef.set(itemRef.get().addData(item.getData()));
//Return empty to throw away consumed item;
return Flowable.empty();
} else {
item.updateProperty();
return Flowable.just(item);
}
})
.applyIfLastElement(item -> {
if (item.shouldBeAccumulated()) {
return Flowable.just(itemRef.get());
}
})
下面是你如何做到的(在 RxJava 2.x 中,它非常接近 RxJava 3.x)。诀窍是使用 defer
(为 Flowable 封装状态以便它可以被多次订阅的最佳方式)和 concatWith
。 defer
还可以在 last
的情况下启用惰性求值。另请注意,作为一项性能改进,您可能并不关心我使用了一个元素数组而不是 AtomicReference 对象(以避免不必要的易失性读取、设置等)。
Flowable<Integer> result = Flowable.defer(() -> {
boolean[] isFirst = new boolean[] { true };
Integer[] state = new Integer[1];
Maybe<Integer> last = Maybe.defer(() -> {
if (state[0] == null) {
return Maybe.empty();
} else {
return Maybe.just(state[0]);
}
});
return source //
.flatMap(x -> {
if (state[0] != null || isFirst[0] && shouldBeAccumulated(x)) {
// accumulate
state[0] = state[0] == null ? 0 : state[0] + x;
isFirst[0] = false;
return Flowable.empty();
} else {
isFirst[0] = false;
return Flowable.just(x);
}
})
.concatWith(last);
});
上下文:
要处理 Flowable<Item>
,我需要先处理第一个 item
,然后根据该处理将所有项目累积到一个项目中 (reduce
),或者在每个项目上应用一个简单的映射而不任意累积 (map
).
我能想到的一种方法要求操作员知道当前元素是最后一个元素。 有没有这样的运算符知道当前元素是否是最后一个元素?我不能使用缓冲区,因为即使不应该进行累加,它也会始终获取 2 个元素。
AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
flatMap(item -> {
if(item.shouldBeAccumulated()) {
//Accumulate data into reference
itemRef.set(itemRef.get().addData(item.getData()));
//Return empty to throw away consumed item;
return Flowable.empty();
} else {
item.updateProperty();
return Flowable.just(item);
}
})
.applyIfLastElement(item -> {
if (item.shouldBeAccumulated()) {
return Flowable.just(itemRef.get());
}
})
下面是你如何做到的(在 RxJava 2.x 中,它非常接近 RxJava 3.x)。诀窍是使用 defer
(为 Flowable 封装状态以便它可以被多次订阅的最佳方式)和 concatWith
。 defer
还可以在 last
的情况下启用惰性求值。另请注意,作为一项性能改进,您可能并不关心我使用了一个元素数组而不是 AtomicReference 对象(以避免不必要的易失性读取、设置等)。
Flowable<Integer> result = Flowable.defer(() -> {
boolean[] isFirst = new boolean[] { true };
Integer[] state = new Integer[1];
Maybe<Integer> last = Maybe.defer(() -> {
if (state[0] == null) {
return Maybe.empty();
} else {
return Maybe.just(state[0]);
}
});
return source //
.flatMap(x -> {
if (state[0] != null || isFirst[0] && shouldBeAccumulated(x)) {
// accumulate
state[0] = state[0] == null ? 0 : state[0] + x;
isFirst[0] = false;
return Flowable.empty();
} else {
isFirst[0] = false;
return Flowable.just(x);
}
})
.concatWith(last);
});