即使抛出异常,如何迭代流?

How to iterate a stream even if exceptions are thrown?

stream.map(obj -> doMap(obj)).collect(Collectors.toList());

private String doMap(Object obj) {
    if (objectIsInvalid) {
    throw new ParseException("Object could not be parsed");
    }
}

问题:如何抛出异常并让流迭代知道它不应该中断整个迭代,而是继续下一个元素(并最终记录失败的对象)?

毫无例外,您可以使用 Optionals:

stream.map(obj -> doMap(obj))
      .filter(obj -> obj.isPresent())
      .collect(Collectors.toList());

private Optional<String> doMap(Object obj) {
   if (objectIsInvalid) {
    return Optional.empty();
   }
}

如果你想要一种方法来安静地忽略那些在映射时抛出异常的元素,你可以定义一个辅助方法 returns 成功或失败,作为 0 或 1 个元素的流,并且然后用那个方法 flatMap 你的流。

import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Stream;

public class Fish {
    private int weight;
    public Fish(int weight) { this.weight = weight; }
    public String size() {
        if (weight < 10) {
            return "small";
        } else if (weight < 50) {
            return "medium";
        } else if (weight < 100) {
            return "large";
        } else {
            throw new RuntimeException("I don't believe you");
        }   
    }

    public static <T> Stream<T> successOrNothing(Supplier<T> supplier) {
      try {
        return Stream.of(supplier.get());
      } catch (Exception e) {
        return Stream.empty();
      }
    }

    public static void main(String[] args) {
        Stream<Fish> fishes = Stream.of(new Fish(1000), new Fish(2));
        List<String> sizes = fishes
          .flatMap(fish -> successOrNothing(fish::size))
          .collect(toList());
        System.out.println(sizes);
    }
}

流中的每个元素都映射到 1 个元素(如果调用成功返回)或 0 个元素(如果未成功返回)。这可能比 "change the method to return null or Optional.empty() instead of throwing" 模式更容易或更直接,尤其是在处理其合同已定义且您不想更改的方法时。

警告:此解决方案仍然默默地忽略异常;这在很多情况下都是不合适的。

这里有一个奇怪的技巧可以用来改进异常处理。

假设您的映射函数是这样的:

String doMap(Object obj) {
    if (isInvalid(obj)) {
        throw new IllegalArgumentException("info about obj");
    } else {
        return obj.toString();
    }
}

如果对象有效,则此 return 是一个结果,但如果对象无效,则会引发异常。不幸的是,如果您将其直接粘贴到管道中,任何错误都会停止管道执行。您想要的是类似于 "either" 类型的东西,它可以包含一个值或一个错误指示符(这在 Java 中是一个例外)。

事实证明 CompletableFuture 可以保存值或异常。虽然它是为异步处理而设计的——这里没有发生——我们只需要稍微扭曲它就可以用于我们的目的。

首先,给定 stream 个要处理的对象,我们调用包装在对 supplyAsync:

的调用中的映射函数
 CompletableFuture<String>[] cfArray = 
        stream.map(obj -> CompletableFuture.supplyAsync(() -> doMap(obj), Runnable::run))
              .toArray(n -> (CompletableFuture<String>[])new CompletableFuture<?>[n]);

(不幸的是,通用数组创建给出了未经检查的警告,必须将其抑制。)

奇怪的结构

 CompletableFuture.supplyAsync(supplier, Runnable::run)

运行s 供应商 "asynchronously" 在提供的执行器 Runnable::run 上,它只是 运行s 立即在此线程中的任务。换句话说,它运行同步供应商。

诀窍是从这个调用中 returned 的 CompletableFuture 实例包含来自供应商的值,如果它 returned 正常,或者它包含异常,如果供应商扔了一个。 (我在这里不考虑取消。)然后我们将 CompletableFuture 个实例收集到一个数组中。为什么是数组?这是下一部分的设置:

CompletableFuture.allOf(cfArray).join();

这通常会等待 CF 数组完成。由于它们已 运行 同步,因此它们应该已经全部完成。对于这种情况重要的是,如果数组中的 any 异常完成,join() 将抛出 CompletionException。如果连接正常完成,我们可以简单地收集 return 值。如果连接抛出异常,我们可以传播它,或者我们可以捕获它并处理存储在数组中的 CF 中的异常。例如,

try {
    CompletableFuture.allOf(cfArray).join();
    // no errors
    return Arrays.stream(cfArray)
                 .map(CompletableFuture::join)
                 .collect(toList());
} catch (CompletionException ce) {
    long errcount =
        Arrays.stream(cfArray)
              .filter(CompletableFuture::isCompletedExceptionally)
              .count();
    System.out.println("errcount = " + errcount);
    return Collections.emptyList();
}

如果全部成功,这 return 是一个值列表。如果有任何异常,这将计算异常的数量并且 returns 是一个空列表。当然,您可以轻松地做其他事情,比如记录异常的详细信息、过滤掉异常和 return 有效值列表等。

我会做以下...

stream.map(doMapLenient())
      .filter(Objects::nonNull)
      .collect(Collectors.toList());

private String doMap(Object obj) {
    if (objectIsInvalid(obj)) {
    throw new ParseException("Object could not be parsed");
    }
    return "Streams are cool";
}

private Function<Object, String> doMapLenient() {
  return obj -> {
     try {
       return doMap(obj);   
   } catch (ParseExcepion ex) {
       return null; 
   }
}

在这里你也可以很好地添加一些登录捕获部分

您可以使用迭代器显式地迭代流元素:

Stream<Object> stream = ... // throws unchecked exceptions;
Iterator<Object> iterator = stream.iterator();
while (iterator.hasNext()) {
    try {
        Object obj = it.next();
        // process
    } catch (ParseException ex) {
        // handle
    }
}