如何重置 EventStream 中的最后一个累加值?

How to reset the last accumulation value in EventStream?

我有一个来自 N 个可观察值的合并 EventStream。从这个值我想要最小的。例如:

    Var<Integer> a = Var.newSimpleVar(2);
    Var<Integer> b = Var.newSimpleVar(3);
    Var<Integer> c = Var.newSimpleVar(4);
    ...

    EventStream<Integer> m = EventStreams.merge(a.values(), b.values(), c.values(), ...);

    m = m.filter(integer -> integer > 1).accumulate((i1, i2) -> Math.min(i1, i2));
    m.subscribe(integer -> System.out.println(integer));


    a.setValue(0); // At this point the Input is filtered and not emitted also I dont want the last value "2" in the accumulation. 
    b.setValue(5);
    c.setValue(3);
    //Output is always "2".

我的问题是我想要在第一个过滤值之后还有一个新的初始值用于累积。在这种情况下,例如 "Integer.MAX_VALUE".

所以 accumulate 中的下一个比较不是:
"Math.min(2,5)" -> "Math.min(2,3)"
但是
"Math.min(MAX_VALUE,5)" -> "Math.min(5,3)".

所以输出不应该是:
2, 2, 2, 2, 2
但是

a -> 2 : 输出最小值 2
b -> 3 : 输出最小值 2
c -> 4 : 输出最小值 2

a -> 0:OK 条件(值 < 1)为真。现在重置或更好地重复流(不保留累积的最后一个值 2)

b -> 5 : 输出最小值 5
c -> 3 : 输出最小值 3
a -> 4 : 输出最小值 3
...

我会用

EventStreams.combine(a.values(), b.values(), c.values())
            .map(t3 -> t3.map((a, b, c) -> min3(a, b, c)));

您在此处定义 min3 以取 3 个值中的最小值,但忽略零。

好的,我找到了解决办法。 忽略零是一个非常好的提示。谢谢托马斯 :)

    Var<Integer> a = Var.newSimpleVar(2);
    Var<Integer> b = Var.newSimpleVar(3);
    Var<Integer> c = Var.newSimpleVar(4);
    ...

    EventStream<Integer> m = EventStreams.merge(a.values(), b.values(), c.values(), ...);
    m = m.accumulate((i1, i2) -> i2 < 1 ? Integer.MAX_VALUE : Math.min(i1, i2)).filter(integer -> integer != Integer.MAX_VALUE);
    m.subscribe(integer -> System.out.println(integer));

    a.setValue(0);
    b.setValue(5);
    c.setValue(3);
    a.setValue(4);

输出为:
a -> 2 : 输出最小值 2
b -> 3 : 输出最小值 2
c -> 4 : 输出最小值 2

a -> 0:无输出

b -> 5 : 输出最小值 5
c -> 3 : 输出最小值 3
a -> 4 : 输出最小值 3

所以问题是我无法在累积执行之前进行过滤(在这种情况下)。 还有一些问题,例如,如果此流中的第一个值为零(修复看起来像 ... (i1, i2) -> i1 < 1 ? i2 : i2 < 1 ? Integer.MAX_VALUE ...)。 但无论如何,在我的情况或类似的情况下,这种解决方案有效或应该有效;)