StreamEx 分组到列表 returns 不正确的记录数

StreamEx grouping into lists returns an incorrect number of records

以下代码将对象流分成 1000 个块,在物化时处理它们,最后 returns 对象总数。

在所有情况下,数字 returned 都是正确的,除非流大小恰好为 1。在流大小为 1 的情况下,数字 returned 为 0。

如有任何帮助,我们将不胜感激。如果流中没有记录为 0,我还不得不破解 return 调用。我也想解决这个问题。

AtomicInteger recordCounter = new AtomicInteger(0);
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
        stream.groupRuns((prev, next) -> recordCounter.incrementAndGet() % 1000 != 0)
              .forEach((chunk) ->
                      {
                          //... process each chunk
                      }
              );
    } catch(Exception e) {
        throw new MyRuntimeException("Failure streaming...", e);
    } finally {
        myObjects.close();
    }

return recordCounter.get() == 0 ? 0 : recordCounter.incrementAndGet();

Originally 计数器用于了解何时拆分块,并且计算对象总数是不可靠的。当流的大小为 0 或 1 时 groupRuns 函数不被执行。

所以你需要另一种方法来计算对象。您可以 return 处理 chunk.size() 的对象数量,而不是仅仅消耗 forEach 中的项目,最后 sum 它们

    AtomicInteger counter = new AtomicInteger(0);
    try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
        return stream
                .groupRuns((prev, next) -> counter.incrementAndGet() % 1000 != 0)
                .mapToLong((chunk) -> {
                     //... process each chunk
                     return chunk.size();
                 })
                .sum();
    } catch(Exception e) {
        throw new MyRuntimeException("Failure streaming...", e);
    } finally {
        myObjects.close();
    }

@Nazarii Bardiuk 解释说,为什么它不起作用。我满足了之前拆分流的类似要求。所以我将其分叉并在 StreamEx-0.8.7 处做了一些更改。这是一个简单的例子:

int count = IntStreamEx.range(0, 10).boxed().splitToList(3).mapToInt(chunk -> {
    System.out.println(chunk);
    return chunk.size();
}).sum();

System.out.println(count);

如果您正处于项目的开始,您可以尝试一下,代码将是:

try (StreamEx<MyObject> stream = StreamEx.of(myObjects).onClose(() -> myObjects.close())) {
    return stream.splitToList(1000)
                 .mapToInt((chunk) -> {
                              //... process each chunk
                     return chunk.size();
                  }).sum();
}

正如JavaDoc所说:

sameGroup - a non-interfering, stateless predicate to apply to the pair of adjacent elements which returns true for elements which belong to the same group.

谓词必须是无状态的,这不是你的情况。您滥用了该方法,这就是您无法获得预期结果的原因。它完全接近您想要的效果纯属偶然,您不能依赖这种行为,它可能会在未来的 StreamEx 版本中改变。

最后我使用 Guava 的 Iterators.partition() 将我的对象流分成块:

MutableInt recordCounter = new MutableInt();
try {
    Iterators.partition(myObjects.iterator(), 1000)
             .forEachRemaining((chunk) -> {
                      //process each chunk
                      ...
                      recordCounter.add(chunk.size());
             });
} catch (Exception e) {
    throw new MyRuntimeException("Failure streaming...", e);
} finally {
    myObjects.close();
}

return recordCounter.getValue();