StreamEx 分组到列表 returns 不正确的记录数
StreamEx grouping into lists returns an incorrect number of records
以下代码将对象流分成 1000 个块,在物化时处理它们,最后 returns 对象总数。
在所有情况下,数字 returned 都是正确的,除非流大小恰好为 1。在流大小为 1 的情况下,数字 returned 为 0。
如有任何帮助,我们将不胜感激。如果流中没有记录为 0,我还不得不破解 return 调用。我也想解决这个问题。
AtomicInteger recordCounter = new AtomicInteger(0);
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
stream.groupRuns((prev, next) -> recordCounter.incrementAndGet() % 1000 != 0)
.forEach((chunk) ->
{
//... process each chunk
}
);
} catch(Exception e) {
throw new MyRuntimeException("Failure streaming...", e);
} finally {
myObjects.close();
}
return recordCounter.get() == 0 ? 0 : recordCounter.incrementAndGet();
Originally 计数器用于了解何时拆分块,并且计算对象总数是不可靠的。当流的大小为 0 或 1 时 groupRuns
函数不被执行。
所以你需要另一种方法来计算对象。您可以 return 处理 chunk.size()
的对象数量,而不是仅仅消耗 forEach
中的项目,最后 sum
它们
AtomicInteger counter = new AtomicInteger(0);
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
return stream
.groupRuns((prev, next) -> counter.incrementAndGet() % 1000 != 0)
.mapToLong((chunk) -> {
//... process each chunk
return chunk.size();
})
.sum();
} catch(Exception e) {
throw new MyRuntimeException("Failure streaming...", e);
} finally {
myObjects.close();
}
@Nazarii Bardiuk 解释说,为什么它不起作用。我满足了之前拆分流的类似要求。所以我将其分叉并在 StreamEx-0.8.7 处做了一些更改。这是一个简单的例子:
int count = IntStreamEx.range(0, 10).boxed().splitToList(3).mapToInt(chunk -> {
System.out.println(chunk);
return chunk.size();
}).sum();
System.out.println(count);
如果您正处于项目的开始,您可以尝试一下,代码将是:
try (StreamEx<MyObject> stream = StreamEx.of(myObjects).onClose(() -> myObjects.close())) {
return stream.splitToList(1000)
.mapToInt((chunk) -> {
//... process each chunk
return chunk.size();
}).sum();
}
正如JavaDoc所说:
sameGroup - a non-interfering, stateless predicate to apply to the pair of adjacent elements which returns true for elements which belong to the same group.
谓词必须是无状态的,这不是你的情况。您滥用了该方法,这就是您无法获得预期结果的原因。它完全接近您想要的效果纯属偶然,您不能依赖这种行为,它可能会在未来的 StreamEx 版本中改变。
最后我使用 Guava 的 Iterators.partition() 将我的对象流分成块:
MutableInt recordCounter = new MutableInt();
try {
Iterators.partition(myObjects.iterator(), 1000)
.forEachRemaining((chunk) -> {
//process each chunk
...
recordCounter.add(chunk.size());
});
} catch (Exception e) {
throw new MyRuntimeException("Failure streaming...", e);
} finally {
myObjects.close();
}
return recordCounter.getValue();
以下代码将对象流分成 1000 个块,在物化时处理它们,最后 returns 对象总数。
在所有情况下,数字 returned 都是正确的,除非流大小恰好为 1。在流大小为 1 的情况下,数字 returned 为 0。
如有任何帮助,我们将不胜感激。如果流中没有记录为 0,我还不得不破解 return 调用。我也想解决这个问题。
AtomicInteger recordCounter = new AtomicInteger(0);
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
stream.groupRuns((prev, next) -> recordCounter.incrementAndGet() % 1000 != 0)
.forEach((chunk) ->
{
//... process each chunk
}
);
} catch(Exception e) {
throw new MyRuntimeException("Failure streaming...", e);
} finally {
myObjects.close();
}
return recordCounter.get() == 0 ? 0 : recordCounter.incrementAndGet();
Originally 计数器用于了解何时拆分块,并且计算对象总数是不可靠的。当流的大小为 0 或 1 时 groupRuns
函数不被执行。
所以你需要另一种方法来计算对象。您可以 return 处理 chunk.size()
的对象数量,而不是仅仅消耗 forEach
中的项目,最后 sum
它们
AtomicInteger counter = new AtomicInteger(0);
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
return stream
.groupRuns((prev, next) -> counter.incrementAndGet() % 1000 != 0)
.mapToLong((chunk) -> {
//... process each chunk
return chunk.size();
})
.sum();
} catch(Exception e) {
throw new MyRuntimeException("Failure streaming...", e);
} finally {
myObjects.close();
}
@Nazarii Bardiuk 解释说,为什么它不起作用。我满足了之前拆分流的类似要求。所以我将其分叉并在 StreamEx-0.8.7 处做了一些更改。这是一个简单的例子:
int count = IntStreamEx.range(0, 10).boxed().splitToList(3).mapToInt(chunk -> {
System.out.println(chunk);
return chunk.size();
}).sum();
System.out.println(count);
如果您正处于项目的开始,您可以尝试一下,代码将是:
try (StreamEx<MyObject> stream = StreamEx.of(myObjects).onClose(() -> myObjects.close())) {
return stream.splitToList(1000)
.mapToInt((chunk) -> {
//... process each chunk
return chunk.size();
}).sum();
}
正如JavaDoc所说:
sameGroup - a non-interfering, stateless predicate to apply to the pair of adjacent elements which returns true for elements which belong to the same group.
谓词必须是无状态的,这不是你的情况。您滥用了该方法,这就是您无法获得预期结果的原因。它完全接近您想要的效果纯属偶然,您不能依赖这种行为,它可能会在未来的 StreamEx 版本中改变。
最后我使用 Guava 的 Iterators.partition() 将我的对象流分成块:
MutableInt recordCounter = new MutableInt();
try {
Iterators.partition(myObjects.iterator(), 1000)
.forEachRemaining((chunk) -> {
//process each chunk
...
recordCounter.add(chunk.size());
});
} catch (Exception e) {
throw new MyRuntimeException("Failure streaming...", e);
} finally {
myObjects.close();
}
return recordCounter.getValue();