在 java 中累积流
Accumulating streams in java
最近我一直在尝试将我的数据解析器重新实现到 java 中的流中,但我不知道如何做一件具体的事情:
考虑带有时间戳的对象 A。
考虑由各种 A 对象组成的对象 B
考虑一些告诉我们对象 B 的时间范围的指标。
我现在拥有的是一些带有状态的方法,它遍历带有对象 A 的列表,如果它适合最后一个对象 B,它就会到达那里,否则它会创建新的 B 实例并开始将对象 A 放在那里。
我想以流的方式进行
获取对象A的整个列表并将其作为流。现在我需要找出将创建 "chunks" 并将它们累积到对象 B 中的函数。我该怎么做?
谢谢
编辑:
A 和 B 很复杂,但我会尝试 post 这里的一些简化版本。
class A {
private final long time;
private A(long time) {
this.time = time;
}
long getTime() {
return time;
}
}
class B {
// not important, build from "full" temporaryB class
// result of accumulation
}
class TemporaryB {
private final long startingTime;
private int counter;
public TemporaryB(A a) {
this.startingTime = a.getTime();
}
boolean fits(A a) {
return a.getTime() - startingTime < THRESHOLD;
}
void add(A a) {
counter++;
}
}
class Accumulator {
private List<B> accumulatedB;
private TemporaryBParameters temporaryBParameters
public void addA(A a) {
if(temporaryBParameters.fits(a)) {
temporaryBParameters.add(a)
} else {
accumulateB.add(new B(temporaryBParameters)
temporaryBParameters = new TemporaryBParameters(a)
}
}
}
好的,这是非常简单的方法,我现在该怎么做。我不喜欢它。它很丑。
一般来说,此类问题不适合 Stream API,因为您可能需要 non-local 知识,这会使并行处理变得更加困难。假设您有 new A(1)
、new A(2)
、new A(3)
等等直到 new A(1000)
,其中 Threshold
设置为 10
。所以你基本上需要按 10 个元素将输入组合成批次。这里我们遇到了与 讨论的相同问题:当我们将任务拆分为子任务时,后缀部分可能不知道前缀部分中有多少元素,因此它甚至无法开始将数据组合成批,直到整个前缀被处理。你的问题本质上是连续的。
另一方面,新的 headTail
method in my StreamEx 库提供了一个解决方案。这种方法并行化很差,但是有了它你可以在几行内定义几乎任何操作。
以下是如何使用 headTail
解决您的问题:
static StreamEx<TemporaryB> combine(StreamEx<A> input, TemporaryB tb) {
return input.headTail((head, tail) ->
tb == null ? combine(tail, new TemporaryB(head)) :
tb.fits(head) ? combine(tail, tb.add(head)) :
combine(tail, new TemporaryB(head)).prepend(tb),
() -> StreamEx.ofNullable(tb));
}
我在这里修改了你的TemporaryB
方法:
TemporaryB add(A a) {
counter++;
return this;
}
样本(假设Threshold
= 1000):
List<A> input = Arrays.asList(new A(1), new A(10), new A(1000), new A(1001), new A(
1002), new A(1003), new A(2000), new A(2002), new A(2003), new A(2004));
Stream<B> streamOfB = combine(StreamEx.of(input), null).map(B::new);
streamOfB.forEach(System.out::println);
输出(我写的很简单B.toString()
):
B [counter=2, startingTime=1]
B [counter=3, startingTime=1001]
B [counter=2, startingTime=2002]
所以这里你实际上有一个懒惰的 Stream
of B
.
解释:
StreamEx.headTail
参数是两个 lambda。当输入流为 non-empty 时,First 最多被调用一次。它接收第一个流元素(head)和包含所有其他元素的流(tail)。当输入流为空且不接收任何参数时,第二个最多调用一次。两者都应该产生一个将被使用的输出流。所以我们这里有:
return input.headTail((head, tail) ->
tb == null
是起始案例,从 head
创建新的 TemporaryB
并使用 tail
:
调用自身
tb == null ? combine(tail, new TemporaryB(head)) :
tb.fits(head)
?好的,只需将 head
添加到现有 tb
并使用 tail
:
调用 self
tb.fits(head) ? combine(tail, tb.add(head)) :
否则再次创建新的 TemporaryB(head)
,但也在输出流前面加上当前的 tb
(实际上是将新元素发射到目标流中):
combine(tail, new TemporaryB(head)).prepend(tb),
输入流耗尽?好吧,return最后一集tb
如果有的话:
() -> StreamEx.ofNullable(tb));
请注意,headTail
实现保证这种解决方案在查找递归时不会占用堆栈和堆超过常量。如果您有疑问,可以在数千个输入元素上进行检查:
Stream<B> streamOfB = combine(LongStreamEx.range(100000).mapToObj(A::new), null).map(B::new);
streamOfB.forEach(System.out::println);
最近我一直在尝试将我的数据解析器重新实现到 java 中的流中,但我不知道如何做一件具体的事情:
考虑带有时间戳的对象 A。 考虑由各种 A 对象组成的对象 B 考虑一些告诉我们对象 B 的时间范围的指标。
我现在拥有的是一些带有状态的方法,它遍历带有对象 A 的列表,如果它适合最后一个对象 B,它就会到达那里,否则它会创建新的 B 实例并开始将对象 A 放在那里。
我想以流的方式进行
获取对象A的整个列表并将其作为流。现在我需要找出将创建 "chunks" 并将它们累积到对象 B 中的函数。我该怎么做?
谢谢
编辑:
A 和 B 很复杂,但我会尝试 post 这里的一些简化版本。
class A {
private final long time;
private A(long time) {
this.time = time;
}
long getTime() {
return time;
}
}
class B {
// not important, build from "full" temporaryB class
// result of accumulation
}
class TemporaryB {
private final long startingTime;
private int counter;
public TemporaryB(A a) {
this.startingTime = a.getTime();
}
boolean fits(A a) {
return a.getTime() - startingTime < THRESHOLD;
}
void add(A a) {
counter++;
}
}
class Accumulator {
private List<B> accumulatedB;
private TemporaryBParameters temporaryBParameters
public void addA(A a) {
if(temporaryBParameters.fits(a)) {
temporaryBParameters.add(a)
} else {
accumulateB.add(new B(temporaryBParameters)
temporaryBParameters = new TemporaryBParameters(a)
}
}
}
好的,这是非常简单的方法,我现在该怎么做。我不喜欢它。它很丑。
一般来说,此类问题不适合 Stream API,因为您可能需要 non-local 知识,这会使并行处理变得更加困难。假设您有 new A(1)
、new A(2)
、new A(3)
等等直到 new A(1000)
,其中 Threshold
设置为 10
。所以你基本上需要按 10 个元素将输入组合成批次。这里我们遇到了与
另一方面,新的 headTail
method in my StreamEx 库提供了一个解决方案。这种方法并行化很差,但是有了它你可以在几行内定义几乎任何操作。
以下是如何使用 headTail
解决您的问题:
static StreamEx<TemporaryB> combine(StreamEx<A> input, TemporaryB tb) {
return input.headTail((head, tail) ->
tb == null ? combine(tail, new TemporaryB(head)) :
tb.fits(head) ? combine(tail, tb.add(head)) :
combine(tail, new TemporaryB(head)).prepend(tb),
() -> StreamEx.ofNullable(tb));
}
我在这里修改了你的TemporaryB
方法:
TemporaryB add(A a) {
counter++;
return this;
}
样本(假设Threshold
= 1000):
List<A> input = Arrays.asList(new A(1), new A(10), new A(1000), new A(1001), new A(
1002), new A(1003), new A(2000), new A(2002), new A(2003), new A(2004));
Stream<B> streamOfB = combine(StreamEx.of(input), null).map(B::new);
streamOfB.forEach(System.out::println);
输出(我写的很简单B.toString()
):
B [counter=2, startingTime=1]
B [counter=3, startingTime=1001]
B [counter=2, startingTime=2002]
所以这里你实际上有一个懒惰的 Stream
of B
.
解释:
StreamEx.headTail
参数是两个 lambda。当输入流为 non-empty 时,First 最多被调用一次。它接收第一个流元素(head)和包含所有其他元素的流(tail)。当输入流为空且不接收任何参数时,第二个最多调用一次。两者都应该产生一个将被使用的输出流。所以我们这里有:
return input.headTail((head, tail) ->
tb == null
是起始案例,从 head
创建新的 TemporaryB
并使用 tail
:
tb == null ? combine(tail, new TemporaryB(head)) :
tb.fits(head)
?好的,只需将 head
添加到现有 tb
并使用 tail
:
tb.fits(head) ? combine(tail, tb.add(head)) :
否则再次创建新的 TemporaryB(head)
,但也在输出流前面加上当前的 tb
(实际上是将新元素发射到目标流中):
combine(tail, new TemporaryB(head)).prepend(tb),
输入流耗尽?好吧,return最后一集tb
如果有的话:
() -> StreamEx.ofNullable(tb));
请注意,headTail
实现保证这种解决方案在查找递归时不会占用堆栈和堆超过常量。如果您有疑问,可以在数千个输入元素上进行检查:
Stream<B> streamOfB = combine(LongStreamEx.range(100000).mapToObj(A::new), null).map(B::new);
streamOfB.forEach(System.out::println);