使用嵌套承诺创建承诺队列
Creating a queue of promises with nested promises
我正在实现一个查询引擎来批量获取和处理请求。我正在使用 async/await.
现在,执行流程 运行 在一个层次结构中,其中有一个包含查询的项目列表,并且每个查询都有一个提取。
我想做的是将项目打包成 n 组,因此即使每个项目都有 m 个查询并在其中提取,也只有 n*m 个请求同时 运行;并且特别是同一域只会同时发出一个请求。
问题是,当我等待项目的执行时(在外层,一段时间内对项目进行分组并将停止迭代直到承诺解决),当执行内部查询时,这些承诺正在解决由于提取的内部等待而被推迟。
这导致我的排队只是暂时停止,而不是等待内部承诺解决。
这是外面,排队class:
class AsyncItemQueue {
constructor(items, concurrency) {
this.items = items;
this.concurrency = concurrency;
}
run = async () => {
let itemPromises = [];
const bundles = Math.ceil(this.items.length / this.concurrency);
let currentBundle = 0;
while (currentBundle < bundles) {
console.log(`<--------- FETCHING ITEM BUNDLE ${currentBundle} OF ${bundles} --------->`);
const lowerRange = currentBundle * this.concurrency;
const upperRange = (currentBundle + 1) * this.concurrency;
itemPromises.push(
this.items.slice(lowerRange, upperRange).map(item => item.run())
);
await Promise.all(itemPromises);
currentBundle++;
}
};
}
export default AsyncItemQueue;
这是队列 运行ning 的简单项目 class。我省略了多余的代码。
class Item {
// ...
run = async () => {
console.log('Item RUN', this, this.name);
return await Promise.all(this.queries.map(query => {
const itemPromise = query.run(this.name);
return itemPromise;
}));
}
}
这是项目中包含的查询。每个项目都有一个查询列表。同样,删除了一些代码,因为它没有意义。
class Query {
// ...
run = async (item) => {
// Step 1: If requisites, await.
if (this.requires) {
await this.savedData[this.requires];
}
// Step 2: Resolve URL.
this.resolveUrl(item);
// Step 3: If provides, create promise in savedData.
const fetchPromise = this.fetch();
if (this.saveData) {
this.saveData.forEach(sd => (this.savedData[sd] = fetchPromise));
}
// Step 4: Fetch.
const document = await fetchPromise;
// ...
}
}
AsyncItemQueue
中的 while 正确停止,但直到执行流程到达 Query
中的步骤 3。一旦它到达那个 fetch,它是标准 fetch 函数的包装器,外部 promise 就会解决,我最终会同时执行所有请求。
我怀疑问题出在查询 class 的某个地方,但我对如何避免解决外部承诺感到困惑。
我尝试制作 Query
class run
函数 return 文档,以防万一,但无济于事。
任何想法或指导将不胜感激。我会尽力回答有关代码的任何问题,或者在需要时提供更多信息。
谢谢!
PS:这是一个带有工作示例的codesandbox:https://codesandbox.io/s/goofy-tesla-iwzem
正如您在控制台出口看到的那样,while 循环在提取完成之前进行迭代,并且它们都是同时执行的。
我已经解决了
问题出在 AsyncItemQueue
class。具体来说:
itemPromises.push(
this.items.slice(lowerRange, upperRange).map(item => item.run())
);
那是将承诺列表推入列表,因此,稍后:
await Promise.all(itemPromises);
在那个列表中没有找到任何等待的承诺(因为它包含更多列表,里面有承诺)。
解决方案是将代码更改为:
await Promise.all(this.items.slice(lowerRange, upperRange).map(item => item.run()));
现在一切正常。项目正在 运行 分批处理,每批处理 n 次,新的一批将 运行 直到前一批完成。
我不确定这对除我以外的任何人都有帮助,但我会把它留在这里,以防有一天有人发现类似的问题。感谢您的帮助。
我正在实现一个查询引擎来批量获取和处理请求。我正在使用 async/await.
现在,执行流程 运行 在一个层次结构中,其中有一个包含查询的项目列表,并且每个查询都有一个提取。
我想做的是将项目打包成 n 组,因此即使每个项目都有 m 个查询并在其中提取,也只有 n*m 个请求同时 运行;并且特别是同一域只会同时发出一个请求。
问题是,当我等待项目的执行时(在外层,一段时间内对项目进行分组并将停止迭代直到承诺解决),当执行内部查询时,这些承诺正在解决由于提取的内部等待而被推迟。
这导致我的排队只是暂时停止,而不是等待内部承诺解决。
这是外面,排队class:
class AsyncItemQueue {
constructor(items, concurrency) {
this.items = items;
this.concurrency = concurrency;
}
run = async () => {
let itemPromises = [];
const bundles = Math.ceil(this.items.length / this.concurrency);
let currentBundle = 0;
while (currentBundle < bundles) {
console.log(`<--------- FETCHING ITEM BUNDLE ${currentBundle} OF ${bundles} --------->`);
const lowerRange = currentBundle * this.concurrency;
const upperRange = (currentBundle + 1) * this.concurrency;
itemPromises.push(
this.items.slice(lowerRange, upperRange).map(item => item.run())
);
await Promise.all(itemPromises);
currentBundle++;
}
};
}
export default AsyncItemQueue;
这是队列 运行ning 的简单项目 class。我省略了多余的代码。
class Item {
// ...
run = async () => {
console.log('Item RUN', this, this.name);
return await Promise.all(this.queries.map(query => {
const itemPromise = query.run(this.name);
return itemPromise;
}));
}
}
这是项目中包含的查询。每个项目都有一个查询列表。同样,删除了一些代码,因为它没有意义。
class Query {
// ...
run = async (item) => {
// Step 1: If requisites, await.
if (this.requires) {
await this.savedData[this.requires];
}
// Step 2: Resolve URL.
this.resolveUrl(item);
// Step 3: If provides, create promise in savedData.
const fetchPromise = this.fetch();
if (this.saveData) {
this.saveData.forEach(sd => (this.savedData[sd] = fetchPromise));
}
// Step 4: Fetch.
const document = await fetchPromise;
// ...
}
}
AsyncItemQueue
中的 while 正确停止,但直到执行流程到达 Query
中的步骤 3。一旦它到达那个 fetch,它是标准 fetch 函数的包装器,外部 promise 就会解决,我最终会同时执行所有请求。
我怀疑问题出在查询 class 的某个地方,但我对如何避免解决外部承诺感到困惑。
我尝试制作 Query
class run
函数 return 文档,以防万一,但无济于事。
任何想法或指导将不胜感激。我会尽力回答有关代码的任何问题,或者在需要时提供更多信息。
谢谢!
PS:这是一个带有工作示例的codesandbox:https://codesandbox.io/s/goofy-tesla-iwzem
正如您在控制台出口看到的那样,while 循环在提取完成之前进行迭代,并且它们都是同时执行的。
我已经解决了
问题出在 AsyncItemQueue
class。具体来说:
itemPromises.push(
this.items.slice(lowerRange, upperRange).map(item => item.run())
);
那是将承诺列表推入列表,因此,稍后:
await Promise.all(itemPromises);
在那个列表中没有找到任何等待的承诺(因为它包含更多列表,里面有承诺)。
解决方案是将代码更改为:
await Promise.all(this.items.slice(lowerRange, upperRange).map(item => item.run()));
现在一切正常。项目正在 运行 分批处理,每批处理 n 次,新的一批将 运行 直到前一批完成。
我不确定这对除我以外的任何人都有帮助,但我会把它留在这里,以防有一天有人发现类似的问题。感谢您的帮助。