从异步函数生成生成器的值而无需等待

Yield values from generator from async functions without awaiting

我需要同时 运行 许多异步函数并在它们完成时产生结果,顺序无关紧要。

这是我在一个简化的例子中得到的,当然这不能正常工作,因为它在移动到下一个请求之前等待每个响应。

 Stream<String> stringGenerator(List<http.Request> requests) async* {
    final httpClient = http.Client();
    for (var req in requests) {
      final response = await httpClient.send(req);
      yield response.headers['example'];
    }
  }

你能试试看这对你有用吗?

Stream<String> stringGenerator(List<http.Request> requests) {
  final controller = StreamController<String>();
  final httpClient = http.Client();

  Future.wait(requests.map((req) => httpClient
          .send(req)
          .then((response) => controller.add(response.headers['example']!))))
      .whenComplete(() => controller.close());

  return controller.stream;
}

更正确的说法是这样,因为我们不想在根据 StreamController 的文档侦听事件之前生成事件。这对于内部使用来说真的不是问题,因为 StreamController 会缓冲事件,直到订阅一个监听器:

Stream<String> stringGenerator(List<http.Request> requests) {
  final controller = StreamController<String>();
  
  controller.onListen = () {
    final httpClient = http.Client();

    Future.wait(requests.map((req) => httpClient
        .send(req)
        .then((response) => controller.add(response.headers['example']!))))
        .whenComplete(() => controller.close());
  };

  return controller.stream;
}

@julemand101 解决方案的通用替代方案,适用于任何类型的期货:

Stream<T> fromFutures<T>(Iterable<Future<T>> futures) {
  var pending = 0;
  var controller = Controller<T>();
  for (var future in futures) {
    pending++;
    future.then((v) {
      controller.add(v);
      if (--pending == 0) controller.close();
    }, onError: (e, s) {
      controller.addError(e, s);
      if (--pending == 0) controller.close();
    });
  }
  return controller.stream;
}

您可以使用此指定 stringGenerator 作为:

Stream<String> stringGenerator(List<http.Request> requests) async* {
  var client = http.Client();
  yield* fromFutures(requests.map(client.send));
}