如何解决流中的承诺?

How do I resolve promises within stream?

我有一个高地流,其中每个元素都是一个获取请求的承诺:

const stream = _[promise1, promise2, promise3, ...];

所以当然当我 运行:

const stream.each(console.log)

我只看到:

Promise { <pending> }
Promise { <pending> }
Promise { <pending> }

但我想创建一个管道并处理承诺的实际结果。 (我不想使用 thenawait)。我想通过流管道解决这些承诺。

我想应该有一种方法可以让高地流只将已解决的承诺映射到新的流中,所以我希望能够将承诺流扁平化为它们的实际值流。

四处寻找,我想 flatMapflatten 应该做我正在寻找的东西,但我不知道如何继续,我所有的试错都失败了。

我试过了:

 stream.flatMap((id: number) => {
     return myAsyncGetRequest(id);
 }).each(console.log)

如何解决流中的承诺?

我偶然发现了一个 github 问题,the answer of quarterto 启发了我:

必须将 promise 包装在 Highland 中,flatMap 才能按预期工作:

 stream.flatMap((id: number) => {
     return Highland(myAsyncGetRequest(id));
 }).each(console.log)

该主题还阐明了为什么不等待承诺。

quarterto 表述为:

Highland isn't a Promise library. It can consume promises, like it can consume arrays, callbacks and event emitters, but its API is focused around transforming streamed values, not promises. It's like asking why Array.prototype.map doesn't wait for promises, or why Bluebird doesn't wait for Streams.

或者如vqvu所说:

Highland streams already represent a future array of values, so it doesn't really make sense to treat promises as something special.

Highland 不太适合您的需求,虽然我认为这是可能的。

看看 scramjet,它是一个构建在像 Highland 这样的标准节点流之上的框架,但它基于 Promises 在异步情况下的工作方式与同步情况完全相同。

以下是您在超燃冲压发动机中兑现承诺的方式:

const scramjet = require("scramjet");

const stream = scramjet.DataStream.fromArray([d1, d2, d3])
                   .map((data) => aCallThatReturnsAPromise(data))
                   .stringify((resolved) => JSON.stringify(resolved) + "\n")
                   .pipe(fs.createWriteStream('./yourfile.jslines');

由于 Highland 和 Scramjet 都在节点流之上工作,因此您可以根据需要轻松地通过管道连接到您以前的管道。而且它没有依赖性,所以你不会让你的软件变重一倍。 :)