在 Dart 中组合 Future 流

Combining Streams of Futures in Dart

将多个流与流转换器和一个 returns 未来的异步函数相结合,我最终得到了一个未来流。

我可以使用类似的东西来扁平化 Future 的流:

Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
  await for (var future in source) yield* Stream.fromFuture(future);
}

将 Future 转换为流对我来说有点矫枉过正,但我​​没有遇到任何优雅的解决方案。我当然可以将它传递给另一个带有侦听器的可观察对象,但我怀疑我只是忽略了一种更简单的方法?

您的方法存在一个问题,即 returned 流在所有 futures 完成之前不会 return 任何内容。

不过,还有一种更惯用的方法可以从一系列未来创建流:Stream.fromFutures

Stream.fromFutures([
  Future.delayed(Duration(seconds: 3), () => 'a'),
  Future.delayed(Duration(seconds: 2), () => 'b'),
  Future.delayed(Duration(milliseconds: 1500), () => 'c'),
]).listen(print);

如果输入必须是 期货而不是列表,那么在 vanilla Dart 中没有一种简单的方法可以做到这一点。这种方法至少应该使它按预期运行。

Stream<T> flattenFutures<T>(Stream<Future<T>> source) {
  final controller = StreamController<T>();
  source.listen((future) => future.then((value) => controller.add(value)));
  return controller.stream;
}

首先,一串期货是危险的。我强烈建议尽可能避免使用它。

对于您当前的示例,如果您的期货没有任何错误,您可以直接等待未来而不是将其转换为流:

Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
  await for (var future in source) yield await future;
}

这几乎是等待流的未来,然后等待每个未来,最后创建未来结果流的最惯用方式。 不过,它会在 first 错误处中断。 如果你的未来永远没有任何错误,这很好,你可以在这里停止阅读。

如果您真的想将所有错误从 futures 转发到结果流,那么您的原始代码是完美的! async* 函数在不关闭流的情况下在流上发出错误的唯一方法是 yield* 包含错误的流。

一个替代版本,针对可读性而非简洁性进行了优化,可以是:

Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
  await for (var future in source) {
    try {
      yield await future;
    } catch (error, stack) {
      yield* Stream.error(error, stack);
    }
  }
}

但结果与使用 Stream.fromFuture 相同。

我在这里谈论的危险是,当您等待那个未来(或 yield*' 那个流时),您暂停了您正在收听的流。 这本身通常不是问题,因为流是为暂停而构建的。如果有更多数据传入,它只会被缓冲。 但是,当数据是未来数据时,并且该未来可能会因错误而完成,延迟交付意味着您可能没有时间在未来完成之前向未来添加 错误处理程序错误。这将使错误成为 未处理的错误,这可能会使您的应用程序崩溃。所以,这很糟糕。

因此,如果您的流中可能有错误的未来,您需要尽快。即使等待流来传递未来也可能为时已晚,这就是为什么您通常根本不应该有 Stream<Future>。你需要非常聪明来避免问题(你必须同步交付期货,你必须立即对暂停和取消作出反应,并且你不能在创建未来和交付它之间引入其他延迟。)

我建议您重写创建 Stream<Future<T>> 的代码,只创建一个 Stream<T>。如果没有看到原始代码,我不能说如何做到这一点,但您可能会立即等待使用 returns 期货的函数创建的未来,然后将值或错误作为流事件,而不是发送未来本身。