JavaScript array .reduce with async/await

JavaScript array .reduce with async/await

似乎在将 async/await 与 .reduce() 合并时遇到了一些问题,例如:

const data = await bodies.reduce(async(accum, current, index) => {
  const methodName = methods[index]
  const method = this[methodName]
  if (methodName == 'foo') {
    current.cover = await this.store(current.cover, id)
    console.log(current)
    return {
      ...accum,
      ...current
    }
  }
  return {
    ...accum,
    ...method(current.data)
  }
}, {})
console.log(data)

data 对象在 this.store 完成之前 被记录...

我知道您可以将 Promise.all 用于异步循环,但这是否适用于 .reduce()

问题是您的累加器值是承诺 - 它们是 async function 的 return 值。要获得顺序评估(以及除了最后一次迭代之外的所有迭代),您需要使用

const data = await array.reduce(async (accumP, current, index) => {
  const accum = await accumP;
  …
}, Promise.resolve(…));

也就是说,对于 async/await,我通常会推荐 ,它们性能更高,而且通常更简单。

您可以将整个 map/reduce 迭代器块包装到它们自己的 Promise.resolve 中并等待其完成。但是,问题在于累加器不包含您在每次迭代中期望的结果 data/object 。由于内部 async/await/Promise 链,累加器将是实际的 Promises 本身,尽管在您调用商店之前使用了 await 关键字,但它们可能尚未自行解决(这可能会让您相信迭代实际上不会return 直到该调用完成并更新累加器。

虽然这不是最优雅的解决方案,但您可以选择将 data 对象变量移出范围并将其分配为 let 以便可以发生适当的绑定和突变。然后在 async/await/Promise 调用解析时从迭代器内部更新此数据对象。

/* allow the result object to be initialized outside of scope 
   rather than trying to spread results into your accumulator on iterations, 
   else your results will not be maintained as expected within the 
   internal async/await/Promise chain.
*/    
let data = {}; 

await Promise.resolve(bodies.reduce(async(accum, current, index) => {
  const methodName = methods[index]
  const method = this[methodName];
  if (methodName == 'foo') {
    // note: this extra Promise.resolve may not be entirely necessary
    const cover = await Promise.resolve(this.store(current.cover, id));
    current.cover = cover;
    console.log(current);
    data = {
      ...data,
      ...current,
    };
    return data;
  }
  data = {
    ...data,
    ...method(current.data)
  };
  return data;
}, {});
console.log(data);

我喜欢 Bergi 的回答,我认为这是正确的方法。

我还想提一下我的图书馆,叫做 Awaity.js

这让您可以毫不费力地使用 reducemapfilter 等函数以及 async / await:

import reduce from 'awaity/reduce';

const posts = await reduce([1,2,3], async (posts, id) => {

  const res = await fetch('/api/posts/' + id);
  const post = await res.json();

  return {
    ...posts,
    [id]: post
  };
}, {})

posts // { 1: { ... }, 2: { ... }, 3: { ... } }

export const addMultiTextData = async(data) => {
  const textData = await data.reduce(async(a, {
    currentObject,
    selectedValue
  }) => {
    const {
      error,
      errorMessage
    } = await validate(selectedValue, currentObject);
    return {
      ...await a,
      [currentObject.id]: {
        text: selectedValue,
        error,
        errorMessage
      }
    };
  }, {});
};

[未解决 OP 的确切概率;专注于登陆这里的其他人。]

当您需要前面步骤的结果才能处理下一步时,通常会使用 Reduce。在这种情况下,您可以将 promises 串在一起 la:

promise = elts.reduce(
    async (promise, elt) => {
        return promise.then(async last => {
            return await f(last, elt)
        })
    }, Promise.resolve(0)) // or "" or [] or ...

这是一个使用 fs.promise.mkdir() 的示例(当然,使用 mkdirSync 更简单,但在我的例子中,它是跨网络的):

const Path = require('path')
const Fs = require('fs')

async function mkdirs (path) {
    return path.split(/\//).filter(d => !!d).reduce(
        async (promise, dir) => {
            return promise.then(async parent => {
                const ret = Path.join(parent, dir);
                try {
                    await Fs.promises.lstat(ret)
                } catch (e) {
                    console.log(`mkdir(${ret})`)
                    await Fs.promises.mkdir(ret)
                }
                return ret
            })
        }, Promise.resolve(""))
}

mkdirs('dir1/dir2/dir3')

下面是另一个将 100 + 200 ... 500 相加并稍等片刻的示例:

async function slowCounter () {
    const ret = await ([100, 200, 300, 400, 500]).reduce(
        async (promise, wait, idx) => {
            return promise.then(async last => {
                const ret = last + wait
                console.log(`${idx}: waiting ${wait}ms to return ${ret}`)
                await new Promise((res, rej) => setTimeout(res, wait))
                return ret
            })
        }, Promise.resolve(0))
    console.log(ret)
}

slowCounter ()

以下是异步归约的方法:

async function asyncReduce(arr, fn, initialValue) {
  let temp = initialValue;

  for (let idx = 0; idx < arr.length; idx += 1) {
    const cur = arr[idx];

    temp = await fn(temp, cur, idx);
  }

  return temp;
}

有时最好的办法就是简单地将同步和异步的两个代码版本并排放置:

同步版本:

const arr = [1, 2, 3, 4, 5];

const syncRev = arr.reduce((acc, i) => [i, ...acc], []); // [5, 4, 3, 2, 1] 

异步一:

(async () => { 
   const asyncRev = await arr.reduce(async (promisedAcc, i) => {
      const id = await asyncIdentity(i); // could be id = i, just stubbing async op.
      const acc = await promisedAcc;
      return [id, ...acc];
   }, Promise.resolve([]));   // [5, 4, 3, 2, 1] 
})();

//async stuff
async function asyncIdentity(id) {
   return Promise.resolve(id);
}

const arr = [1, 2, 3, 4, 5];
(async () => {
    const asyncRev = await arr.reduce(async (promisedAcc, i) => {
        const id = await asyncIdentity(i);
        const acc = await promisedAcc;
        return [id, ...acc];
    }, Promise.resolve([]));

    console.log('asyncRev :>> ', asyncRev);
})();

const syncRev = arr.reduce((acc, i) => [i, ...acc], []);

console.log('syncRev :>> ', syncRev);

async function asyncIdentity(id) {
    return Promise.resolve(id);
}

Bluebird

的另一个经典选项
const promise = require('bluebird');

promise.reduce([1,2,3], (agg, x) => Promise.resolve(agg+x),0).then(console.log);

// Expected to product sum 6

当前接受的答案建议使用 Promise.all() 而不是 async reduce。但是,这与 async reduce 的行为不同,并且仅适用于您希望异常立即停止所有迭代的情况,但情况并非总是如此。

此外,在该答案的评论中,建议您始终将 await 累加器作为 reducer 中的第一个语句,否则您可能会面临未处理的 promise 拒绝的风险。张贴者还说这是 OP 要求的,但事实并非如此。相反,他只想知道一切何时完成。为了知道你确实需要做 await acc,但这可能在 reducer 中的任何一点。

const reducer = async(acc, key) => {
  const response = await api(item);

  return {
    ...await acc, // <-- this would work just as well for OP
    [key]: reponse,
  }
}
const result = await ['a', 'b', 'c', 'd'].reduce(reducer, {});
console.log(result); // <-- Will be the final result

如何安全使用asyncreduce

也就是说,以这种方式使用减速器确实意味着您需要保证它不会抛出,否则您将得到“未处理的承诺拒绝”。完全有可能通过使用 try-catch 来确保这一点,catch 块返回累加器(可选地带有失败的 API 调用的记录)。

const reducer = async (acc, key) => {
    try {
        data = await doSlowTask(key);
        return {...await acc, [key]: data};
    } catch (error) {
        return {...await acc, [key]: {error}};
    };
}
const result = await ['a', 'b', 'c','d'].reduce(reducer, {});

Promise.allSettled的区别 您可以使用 Promise.allSettled 来接近 async reduce 的行为(带有错误捕获)。然而,这使用起来很笨拙:如果你想减少到一个对象,你需要在它之后添加另一个同步减少。

Promise.allSettled + 常规 reduce 的理论时间复杂度也更高,尽管可能很少有这样的用例会产生影响。 async reduce 可以从第一项完成的那一刻开始累积,而 Promise.allSettled 之后的 reduce 会被阻塞,直到所有承诺都得到履行。当遍历大量元素时,这可能会有所不同。

const responseTime = 200; //ms
function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

const api = async (key) => {
    console.log(`Calling API for ${ key }`);
    // Boz is a slow endpoint.
    await sleep(key === 'boz' ? 800 : responseTime);
    console.log(`Got response for ${ key }`);

    if (key === 'bar') throw new Error(`It doesn't work for ${ key }`);

    return {
        [key]: `API says ${ key }`,
    };
};

const keys = ['foo', 'bar', 'baz', 'buz', 'boz'];

const reducer = async (acc, key) => {
    let data;
    try {
        const response = await api(key);
        data = {
            apiData: response
        };
    } catch (e) {
        data = {
            error: e.message
        };
    }

    // OP doesn't care how this works, he only wants to know when the whole thing is ready.
    const previous = await acc;
    console.log(`Got previous for ${ key }`);

    return {
        ...previous,
        [key]: {
            ...data
        },
    };
};
(async () => {
    const start = performance.now();
    const result = await keys.reduce(reducer, {});
    console.log(`After ${ performance.now() - start }ms`, result); // <-- OP wants to execute things when it's ready.
})();

检查执行顺序 Promise.allSettled:

const responseTime = 200; //ms
function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

const api = async (key) => {
    console.log(`Calling API for ${ key }`);
    // Boz is a slow endpoint.
    await sleep(key === 'boz' ? 800 : responseTime);
    console.log(`Got response for ${ key }`);

    if (key === 'bar') throw new Error(`It doesn't work for ${ key }`);

    return {
        key,
        data: `API says ${ key }`,
    };
};

const keys = ['foo', 'bar', 'baz', 'buz', 'boz'];

(async () => {
    const start = performance.now();
    const apiResponses = await Promise.allSettled(keys.map(api));
    const result = apiResponses.reduce((acc, {status, reason, value}) => {
        const {key, data} = value || {};
        console.log(`Got previous for ${ key }`);
        return {
            ...acc,
            [key]: status === 'fulfilled' ? {apiData: data} : {error: reason.message},
        };
    }, {});
    console.log(`After ${ performance.now() - start }ms`, result); // <-- OP wants to execute things when it's ready.
})();

对于打字稿,前值和初始值需要相同。

const data = await array.reduce(async (accumP: Promise<Tout>, curr<Tin>) => {
    const accum: Tout = await accumP;
    
    doSomeStuff...

    return accum;

}, Promise<Tout>.resolve({} as Tout);