订阅者可以在不关闭流的情况下处理飞镖流中抛出的异常吗?

Can exceptions thrown in dart streams be handled by subscribers without closing the stream?

我无法理解的简短示例:

Stream<int> getNumbersWithException() async* {
  for (var i = 0; i < 10; i++) {
    yield i;
    if (i == 3) throw Exception();
  }
}

使用情况:

getNumbersWithException()
    .handleError((x) => print('Exception caught for $x'))
    .listen((event) {
  print('Observed: $event');
});

这将在输出 3 处停止:

Observed: 0
Observed: 1
Observed: 2
Observed: 3
Exception caught for Exception: foo

根据文档 (https://dart.dev/tutorials/language/streams) and (https://api.dart.dev/stable/2.9.1/dart-async/Stream/handleError.html),这符合预期,因为抛出的异常将自动关闭流。

  1. 这是否意味着在流中处理异常的正确方法,以便订阅可以在此类事件中长期存在,是在流本身内部处理异常?不可能从外面这样做?
  2. 广播流也一样吗?
  3. 如果我以错误的方式思考这个问题,可以从哪些方面开始正确思考?

我目前认为流是异步数据事件的来源,偶尔可能是错误事件。从文档和示例来看,一切看起来都很整洁,但我认为想要处理错误并继续观察数据流是一个正常的用例。我很难编写代码来这样做。但是,我可能会做错。任何见解将不胜感激。


编辑: 我可以补充一点,我尝试了各种方法,例如使用流转换器,结果相同:

var transformer = StreamTransformer<int, dynamic>.fromHandlers(
  handleData: (data, sink) => sink.add(data),
  handleError: (error, stackTrace, sink) =>
      print('Exception caught for $error'),
  handleDone: (sink) => sink.close(),
);
getNumbersWithException().transform(transformer).listen((data) {
  print('Observed: $data');
});

此外,listen() 有一个可选参数 cancelOnError,看起来很有希望,但它默认为 false,所以这里没有雪茄。

生成器方法

Stream<int> getNumbersWithException() async* {
  for (var i = 0; i < 10; i++) {
    yield i;
    if (i == 3) throw Exception();
  }
}

将在您抛出异常时终止。 throw 正常工作,它不会直接将异常添加到流中。因此,它通过循环和方法体传播出去,直到整个方法体以抛出的异常结束。 那时未处理的异常被添加到流中,然后流被关闭,因为正文已经结束。

所以,问题不在于处理,而在于流的生成。 您确实必须在本地处理错误以避免它结束流生成主体。

您不能在 async* 方法中使用 throw 将多个错误添加到流中,错误将是流所做的最后一件事。

实际发出多个错误的可用技巧是产生异常

  if (i == 3) yield* () async* { throw Exception(); }();
  // or:      yield* Stream.fromFuture(Future.error(Exception());

这将直接向生成的流发出异常,而不会在本地抛出异常并结束生成器方法主体。