将 promise All 转换为渐进的 promises resolve(例如每 3 个 promises)不起作用

converting promiseAll to gradual promises resolve(every 3promises for example) does not work

我有一个承诺列表,目前我正在使用 promiseAll 来解决它们

这是我现在的代码:

   const pageFutures = myQuery.pages.map(async (pageNumber: number) => {
      const urlObject: any = await this._service.getResultURL(searchRecord.details.id, authorization, pageNumber);
      if (!urlObject.url) {
        // throw error
      }
      const data = await rp.get({
        gzip: true,
        headers: {
          "Accept-Encoding": "gzip,deflate",
        },
        json: true,
        uri: `${urlObject.url}`,
      })

      const objects = data.objects.filter((object: any) => object.type === "observed-data" && object.created);
      return new Promise((resolve, reject) => {
        this._resultsDatastore.bulkInsert(
          databaseName,
          objects
        ).then(succ => {
          resolve(succ)
        }, err => {
          reject(err)
        })
      })
    })
    const all: any = await Promise.all(pageFutures).catch(e => {
       console.log(e)
     }) 

正如你在这里看到的,我使用了 promise all 并且它有效:

const all: any = await Promise.all(pageFutures).catch(e => {
   console.log(e)
 }) 

但是我注意到它会影响数据库性能,所以我决定一次解决每 3 个问题。 为此,我正在考虑不同的方式,如 cwait、异步池或编写我自己的迭代器 但我对如何做到这一点感到困惑?

例如当我使用cwait时:

let promiseQueue = new TaskQueue(Promise,3);
const all=new Promise.map(pageFutures, promiseQueue.wrap(()=>{}));

我不知道在包装内传递什么所以我现在传递 ()=>{} 加上我得到

Property 'map' does not exist on type 'PromiseConstructor'. 

所以无论我如何让它工作(我自己的迭代器或任何库),只要我对它有很好的理解,我都可以接受。 如果有人能阐明这一点并帮助我摆脱这种困惑,我将不胜感激?

首先,您问了一个关于失败解决方案尝试的问题。那叫X/Y problem.

所以事实上,据我了解你的问题,你想延迟一些数据库请求。

您不想延迟 解析 由数据库请求创建的 Promise... 喜欢 不!不要那样做! 当数据库 return 结果时,promise 将解析。干扰该过程是个坏主意。

我用你试过的库撞了一会儿脑袋……但我无法解决你的问题。所以我想到了循环数据并设置一些超时的想法。

我在这里做了一个可运行的演示:Delaying DB request in small batch

这是代码。请注意,我模拟了一些数据和一个数据库请求。你将不得不适应它。您还必须调整超时延迟。整整一秒肯定太长了。

// That part is to simulate some data you would like to save.

// Let's make it a random amount for fun.
let howMuch = Math.ceil(Math.random()*20)

// A fake data array...
let someData = []
for(let i=0; i<howMuch; i++){
  someData.push("Data #"+i)
}

console.log("Some feak data")
console.log(someData)
console.log("")

// So we have some data that look real. (lol)
// We want to save it by small group

// And that is to simulate your DB request.
let saveToDB = (data, dataIterator) => {
  console.log("Requesting DB...")
  return new Promise(function(resolve, reject) {
    resolve("Request #"+dataIterator+" complete.");
  })
}

// Ok, we have everything. Let's proceed!
let batchSize = 3 // The amount of request to do at once.
let delay = 1000  // The delay between each batch.

// Loop through all the data you have.
for(let i=0;i<someData.length;i++){

  if(i%batchSize == 0){
    console.log("Splitting in batch...")

    // Process a batch on one timeout.
    let timeout = setTimeout(() => {

      // An empty line to clarify the console.
      console.log("")

      // Grouping the request by the "batchSize" or less if we're almost done.
      for(let j=0;j<batchSize;j++){

        // If there still is data to process.
        if(i+j < someData.length){

          // Your real database request goes here.
          saveToDB(someData[i+j], i+j).then(result=>{
            console.log(result)
            // Do something with the result.
            // ...
          })

        } // END if there is still data.

      } // END sending requests for that batch.

    },delay*i)  // Timeout delay.

  } // END splitting in batch.

} // END for each data.

先说几点:

  • 确实,在您当前的设置中,数据库可能必须同时处理多个批量插入。但是这种并发性并不是 使用 Promise.all 引起的。即使您从代码中遗漏了 Promise.all ,它仍然会有这种行为。那是因为承诺已经创建,所以数据库请求将以任何方式执行。

  • 与您的问题无关,但不要使用 promise constructor antipattern:当您已经有承诺时,无需使用 new Promise 创建承诺手:bulkInsert()return是一个承诺,所以return那个。

由于您担心的是数据库负载,我会将 pageFutures 承诺启动的工作限制在非数据库方面:它们不必等待彼此的解决方案,因此代码可以保持原样。

让这些承诺与您当前存储在 objects 中的内容一起解决:您想要插入的数据。然后将所有这些数组连接成一个大数组,并将其提供给 one 数据库 bulkInsert() 调用。

这是它的样子:

const pageFutures = myQuery.pages.map(async (pageNumber: number) => {
  const urlObject: any = await this._service.getResultURL(searchRecord.details.id, 
                                                         authorization, pageNumber);
  if (!urlObject.url) { // throw error }
  const data = await rp.get({
    gzip: true,
    headers: { "Accept-Encoding": "gzip,deflate" },
    json: true,
    uri: `${urlObject.url}`,
  });
  // Return here, don't access the database yet...
  return data.objects.filter((object: any) => object.type === "observed-data" 
                                           && object.created);
});
const all: any = await Promise.all(pageFutures).catch(e => {
  console.log(e);
  return []; // in case of error, still return an array
}).flat(); // flatten it, so all data chunks are concatenated in one long array
// Don't create a new Promise with `new`, only to wrap an other promise. 
//   It is an antipattern. Use the promise returned by `bulkInsert`
return this._resultsDatastore.bulkInsert(databaseName, objects);

这使用了相当新的 .flat()。如果您不支持它,请查看 mdn.

上提供的替代方案