在 java 中累积流

Accumulating streams in java

最近我一直在尝试将我的数据解析器重新实现到 java 中的流中,但我不知道如何做一件具体的事情:

考虑带有时间戳的对象 A。 考虑由各种 A 对象组成的对象 B 考虑一些告诉我们对象 B 的时间范围的指标。

我现在拥有的是一些带有状态的方法,它遍历带有对象 A 的列表,如果它适合最后一个对象 B,它就会到达那里,否则它会创建新的 B 实例并开始将对象 A 放在那里。

我想以流的方式进行

获取对象A的整个列表并将其作为流。现在我需要找出将创建 "chunks" 并将它们累积到对象 B 中的函数。我该怎么做?
谢谢

编辑:

A 和 B 很复杂,但我会尝试 post 这里的一些简化版本。

class A {
    private final long time;
    private A(long time) {
        this.time = time;
    }
    long getTime() {
        return time;
    }
}

class B {
     // not important, build from "full" temporaryB class
     // result of accumulation         
}

class TemporaryB {
    private final long startingTime;
    private int counter;

    public TemporaryB(A a) {
        this.startingTime = a.getTime();
    }

    boolean fits(A a) {
        return a.getTime() - startingTime < THRESHOLD;
    }

    void add(A a) {
        counter++;
    }
}

class Accumulator {
    private List<B> accumulatedB;
    private TemporaryBParameters temporaryBParameters
    public void addA(A a) {
         if(temporaryBParameters.fits(a)) {
             temporaryBParameters.add(a)
         } else {
             accumulateB.add(new B(temporaryBParameters)
             temporaryBParameters = new TemporaryBParameters(a)
         }
    } 
}

好的,这是非常简单的方法,我现在该怎么做。我不喜欢它。它很丑。

一般来说,此类问题不适合 Stream API,因为您可能需要 non-local 知识,这会使并行处理变得更加困难。假设您有 new A(1)new A(2)new A(3) 等等直到 new A(1000),其中 Threshold 设置为 10。所以你基本上需要按 10 个元素将输入组合成批次。这里我们遇到了与 讨论的相同问题:当我们将任务拆分为子任务时,后缀部分可能不知道前缀部分中有多少元素,因此它甚至无法开始将数据组合成批,直到整个前缀被处理。你的问题本质上是连续的。

另一方面,新的 headTail method in my StreamEx 库提供了一个解决方案。这种方法并行化很差,但是有了它你可以在几行内定义几乎任何操作。

以下是如何使用 headTail 解决您的问题:

static StreamEx<TemporaryB> combine(StreamEx<A> input, TemporaryB tb) {
    return input.headTail((head, tail) ->
        tb == null ? combine(tail, new TemporaryB(head)) :
            tb.fits(head) ? combine(tail, tb.add(head)) :
                combine(tail, new TemporaryB(head)).prepend(tb), 
        () -> StreamEx.ofNullable(tb));
}

我在这里修改了你的TemporaryB方法:

TemporaryB add(A a) {
    counter++;
    return this;
}

样本(假设Threshold = 1000):

List<A> input = Arrays.asList(new A(1), new A(10), new A(1000), new A(1001), new A(
        1002), new A(1003), new A(2000), new A(2002), new A(2003), new A(2004));

Stream<B> streamOfB = combine(StreamEx.of(input), null).map(B::new);
streamOfB.forEach(System.out::println);

输出(我写的很简单B.toString()):

B [counter=2, startingTime=1]
B [counter=3, startingTime=1001]
B [counter=2, startingTime=2002]

所以这里你实际上有一个懒惰的 Stream of B.


解释:

StreamEx.headTail 参数是两个 lambda。当输入流为 non-empty 时,First 最多被调用一次。它接收第一个流元素(head)和包含所有其他元素的流(tail)。当输入流为空且不接收任何参数时,第二个最多调用一次。两者都应该产生一个将被使用的输出流。所以我们这里有:

return input.headTail((head, tail) ->

tb == null 是起始案例,从 head 创建新的 TemporaryB 并使用 tail:

调用自身
    tb == null ? combine(tail, new TemporaryB(head)) :

tb.fits(head) ?好的,只需将 head 添加到现有 tb 并使用 tail:

调用 self
        tb.fits(head) ? combine(tail, tb.add(head)) :

否则再次创建新的 TemporaryB(head),但也在输出流前面加上当前的 tb(实际上是将新元素发射到目标流中):

            combine(tail, new TemporaryB(head)).prepend(tb), 

输入流耗尽?好吧,return最后一集tb如果有的话:

    () -> StreamEx.ofNullable(tb));

请注意,headTail 实现保证这种解决方案在查找递归时不会占用堆栈和堆超过常量。如果您有疑问,可以在数千个输入元素上进行检查:

Stream<B> streamOfB = combine(LongStreamEx.range(100000).mapToObj(A::new), null).map(B::new);
streamOfB.forEach(System.out::println);