在 Dart 中组合 Future 流
Combining Streams of Futures in Dart
将多个流与流转换器和一个 returns 未来的异步函数相结合,我最终得到了一个未来流。
我可以使用类似的东西来扁平化 Future 的流:
Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
await for (var future in source) yield* Stream.fromFuture(future);
}
将 Future 转换为流对我来说有点矫枉过正,但我没有遇到任何优雅的解决方案。我当然可以将它传递给另一个带有侦听器的可观察对象,但我怀疑我只是忽略了一种更简单的方法?
您的方法存在一个问题,即 returned 流在所有 futures 完成之前不会 return 任何内容。
不过,还有一种更惯用的方法可以从一系列未来创建流:Stream.fromFutures
。
Stream.fromFutures([
Future.delayed(Duration(seconds: 3), () => 'a'),
Future.delayed(Duration(seconds: 2), () => 'b'),
Future.delayed(Duration(milliseconds: 1500), () => 'c'),
]).listen(print);
如果输入必须是 流 期货而不是列表,那么在 vanilla Dart 中没有一种简单的方法可以做到这一点。这种方法至少应该使它按预期运行。
Stream<T> flattenFutures<T>(Stream<Future<T>> source) {
final controller = StreamController<T>();
source.listen((future) => future.then((value) => controller.add(value)));
return controller.stream;
}
首先,一串期货是危险的。我强烈建议尽可能避免使用它。
对于您当前的示例,如果您的期货没有任何错误,您可以直接等待未来而不是将其转换为流:
Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
await for (var future in source) yield await future;
}
这几乎是等待流的未来,然后等待每个未来,最后创建未来结果流的最惯用方式。
不过,它会在 first 错误处中断。
如果你的未来永远没有任何错误,这很好,你可以在这里停止阅读。
如果您真的想将所有错误从 futures 转发到结果流,那么您的原始代码是完美的! async*
函数在不关闭流的情况下在流上发出错误的唯一方法是 yield*
包含错误的流。
一个替代版本,针对可读性而非简洁性进行了优化,可以是:
Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
await for (var future in source) {
try {
yield await future;
} catch (error, stack) {
yield* Stream.error(error, stack);
}
}
}
但结果与使用 Stream.fromFuture
相同。
我在这里谈论的危险是,当您等待那个未来(或 yield*
' 那个流时),您暂停了您正在收听的流。
这本身通常不是问题,因为流是为暂停而构建的。如果有更多数据传入,它只会被缓冲。
但是,当数据是未来数据时,并且该未来可能会因错误而完成,延迟交付意味着您可能没有时间在未来完成之前向未来添加 错误处理程序错误。这将使错误成为 未处理的错误,这可能会使您的应用程序崩溃。所以,这很糟糕。
因此,如果您的流中可能有错误的未来,您需要尽快。即使等待流来传递未来也可能为时已晚,这就是为什么您通常根本不应该有 Stream<Future>
。你需要非常聪明来避免问题(你必须同步交付期货,你必须立即对暂停和取消作出反应,并且你不能在创建未来和交付它之间引入其他延迟。)
我建议您重写创建 Stream<Future<T>>
的代码,只创建一个 Stream<T>
。如果没有看到原始代码,我不能说如何做到这一点,但您可能会立即等待使用 returns 期货的函数创建的未来,然后将值或错误作为流事件,而不是发送未来本身。
将多个流与流转换器和一个 returns 未来的异步函数相结合,我最终得到了一个未来流。
我可以使用类似的东西来扁平化 Future 的流:
Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
await for (var future in source) yield* Stream.fromFuture(future);
}
将 Future 转换为流对我来说有点矫枉过正,但我没有遇到任何优雅的解决方案。我当然可以将它传递给另一个带有侦听器的可观察对象,但我怀疑我只是忽略了一种更简单的方法?
您的方法存在一个问题,即 returned 流在所有 futures 完成之前不会 return 任何内容。
不过,还有一种更惯用的方法可以从一系列未来创建流:Stream.fromFutures
。
Stream.fromFutures([
Future.delayed(Duration(seconds: 3), () => 'a'),
Future.delayed(Duration(seconds: 2), () => 'b'),
Future.delayed(Duration(milliseconds: 1500), () => 'c'),
]).listen(print);
如果输入必须是 流 期货而不是列表,那么在 vanilla Dart 中没有一种简单的方法可以做到这一点。这种方法至少应该使它按预期运行。
Stream<T> flattenFutures<T>(Stream<Future<T>> source) {
final controller = StreamController<T>();
source.listen((future) => future.then((value) => controller.add(value)));
return controller.stream;
}
首先,一串期货是危险的。我强烈建议尽可能避免使用它。
对于您当前的示例,如果您的期货没有任何错误,您可以直接等待未来而不是将其转换为流:
Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
await for (var future in source) yield await future;
}
这几乎是等待流的未来,然后等待每个未来,最后创建未来结果流的最惯用方式。 不过,它会在 first 错误处中断。 如果你的未来永远没有任何错误,这很好,你可以在这里停止阅读。
如果您真的想将所有错误从 futures 转发到结果流,那么您的原始代码是完美的! async*
函数在不关闭流的情况下在流上发出错误的唯一方法是 yield*
包含错误的流。
一个替代版本,针对可读性而非简洁性进行了优化,可以是:
Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
await for (var future in source) {
try {
yield await future;
} catch (error, stack) {
yield* Stream.error(error, stack);
}
}
}
但结果与使用 Stream.fromFuture
相同。
我在这里谈论的危险是,当您等待那个未来(或 yield*
' 那个流时),您暂停了您正在收听的流。
这本身通常不是问题,因为流是为暂停而构建的。如果有更多数据传入,它只会被缓冲。
但是,当数据是未来数据时,并且该未来可能会因错误而完成,延迟交付意味着您可能没有时间在未来完成之前向未来添加 错误处理程序错误。这将使错误成为 未处理的错误,这可能会使您的应用程序崩溃。所以,这很糟糕。
因此,如果您的流中可能有错误的未来,您需要尽快。即使等待流来传递未来也可能为时已晚,这就是为什么您通常根本不应该有 Stream<Future>
。你需要非常聪明来避免问题(你必须同步交付期货,你必须立即对暂停和取消作出反应,并且你不能在创建未来和交付它之间引入其他延迟。)
我建议您重写创建 Stream<Future<T>>
的代码,只创建一个 Stream<T>
。如果没有看到原始代码,我不能说如何做到这一点,但您可能会立即等待使用 returns 期货的函数创建的未来,然后将值或错误作为流事件,而不是发送未来本身。