流供应商收到错误 'stream has already been operated upon or closed'

Stream supplier getting error 'stream has already been operated upon or closed'

即使我正在为我的流使用 Supplier 并在每次我想检索我的 strem 并对其执行终端操作时使用 Supplier.Get(),我仍然收到 "stream has already been operated upon or closed" 异常.谁能看看我的代码并指出我做错了什么?

抛出异常的方法:

private static void printMyDetails(Supplier<Stream<Result>> mySupplier, String myStatus) {
        checkNotNull(mySupplier);
        checkArgument(isNotEmpty(myStatus), "Invalid My status");

        if (mySupplier.get().noneMatch(result -> true)) { //<-This is where the exception is thrown
            if (log.isInfoEnabled()) {
                log.info("Number of My with status '{}': {}", My, 0);
            }
        } else {
            log.info("Number of My with status '{}': {}", myStatus, mySupplier.get().count());
            log.info("Details of My(s) with status: '{}'", myStatus);
            mySupplier.get().sorted().forEach(Utils::printMyNameAndDetails);
        }
    }

调用上述方法的地方:

rb.keySet().stream().filter(key -> containsIgnoreCase(key, "status")).map(rb::getString)
                .filter(StringUtils::isNotEmpty).forEach(status -> {
            var resultsWithExpectedStatusSupplier = requireNonNull(getResultsWithExpectedStatusSupplier(results, status));
            resultsWithExpectedStatusSupplier.ifPresentOrElse(streamSupplier -> printMyDetails(streamSupplier, status), () -> {
                if (log.isInfoEnabled())
                    log.info("0 My with status: {}", status);
            });
        });

直播供应商:

private static Optional<Supplier<Stream<Result>>> getResultsWithExpectedStatusSupplier(
            @NotNull List<Result> results, @NotNull String expectedStatus) {
        checkArgument(!results.isEmpty(), "Results list is empty");
        checkArgument(isNotEmpty(expectedStatus), "Invalid expected status");

        var resultStreamWithExpectedStatus = requireNonNull(results.stream().filter(result -> ofNullable(result).map(Result::getStatus)
                .allMatch(s -> isNotEmpty(s) && s.equalsIgnoreCase(expectedStatus))));
        return resultStreamWithExpectedStatus.count() == 0 ? Optional.empty() : Optional.of(() -> resultStreamWithExpectedStatus);
    }

您只能使用一次流。看起来 Supplier 总是一次又一次地提供相同的 Stream。在第一个终端操作之后,Stream 被排空;来自 Supplier 的 Stream 必须始终是新的 Stream。

普遍的问题正如Christian Ullenboom 所说:流已被消耗。您代码中的确切位置是方法 getResultsWithExpectedStatusSupplier 中对 resultStreamWithExpectedStatus.count() 的调用,因为 Stream.count 是消耗流的 reduction/terminal 操作。

,您无法在不消耗流的情况下获取流大小。通过将过滤后的项目存储在一个集合中(例如 Collectors.toList)、在那里查询大小并返回集合本身而不是流来修复它?

附带说明一下,我认为您对 Optional 的滥用有点过分了。代码可以更简单地传递空流(或者甚至更好:传递一个空的、过滤的集合)。