Promise Chaining 竞争条件
Promise Chaining Race-Condition
我目前正在研究一个相当简单的逻辑来处理排队的 ZPL 打印作业,这些作业存储在一个数组中,然后迭代该数组,将每个作业的 n 份副本发送到打印机。
我正在将数组缩减为一个承诺链,为每个将副本发送到打印机的作业混合一个子链。对打印机的调用是同步的(我知道......)所以我将它们中的每一个都包装到一个 Promise 中,该 Promise 仅在打印机收到副本时才解析,从而确保顺序处理。
在传输失败的情况下,当前的承诺会拒绝并在主链中捕获一个人为错误。
到目前为止的理论,唉,子链之间似乎存在一种竞争条件。
我已经尽力了,但是就是看不出来...
这里是一些简化的代码 + fiddle,请注意子链是如何 运行 连续的:
['job1', 'job2'].reduce((pMain, item, curIndex) => {
var pSub = Promise.resolve();
for (let i = 0; i < 2; i++) pSub = pSub.then(() => new Promise((resolve, reject) => setTimeout(reject, 2000)));
return pMain.then(() => pSub);
}, Promise.resolve())
.then(() => /* print all done */)
.catch( handleError );
jsfiddle with console.logs here
非常感谢任何建议。被困在如此微不足道的事情上真是太可惜了。
您的 pSub
链全部创建并 运行 在 reduce
调用期间同步。要成为连续的,他们需要进入 then
回调:
['job1', 'job2'].reduce((pMain, item, curIndex) => {
return pMain.then(() => {
var pSub = Promise.resolve();
for (let i = 0; i < 2; i++)
pSub = pSub.then(() => new Promise((resolve, reject) => setTimeout(reject, 2000)));
return pSub;
});
}, Promise.resolve())
或者在两个循环中只构建一个链:
['job1', 'job2'].reduce((promise, item, outerIndex) => {
return Array.from({length: 2}).reduce((promise, _, innerIndex) => {
return promise.then(() => new Promise((resolve, reject) => setTimeout(reject, 2000)));
}, promise);
}, Promise.resolve())
当然 @jfriend 是对的,对于顺序任务你应该只写 async
/await
代码:
for (const item of ['job1', 'job2']) {
for (let i = 0; i < 2; i++) {
await new Promise((resolve, reject) => setTimeout(reject, 2000));
}
}
您还可以使用该解决方案轻松地将 try
块放在正确的水平上。
我想有很多方法可以实现这一点,但就我个人而言,我经常做的是创建一个函数数组,return一个 Promise(一个 PromiseFactory,你可能会说)。
const promiseFactoryA = () => {
return new Promise(resolve => {
console.log('PromiseA started...');
setTimeout(() => {
console.log('PromiseA resolved after 300ms');
resolve();
})
}, 300);
}
const promiseFactories = [promiseFactoryA, promiseFactoryA];
然后,我们可以将数组传递给此函数,该函数将按顺序 运行 它们:
const runPromiseSequentially = promiseFactories => {
let result = Promise.resolve();
promiseFactories.forEach(
(promiseFactory) => {
result = result.then(() => promiseFactory());
},
);
return result;
}
runPromiseSequentially(promiseFactories);
基本上它所做的就是在我们想要开始操作时要求 PromiseFactory 创建 Promise。
示例REPL
不过,如果您可以使用 async
和 await
,那么就没有必要了。
所以,到目前为止,您已经了解在使用 .reduce()
序列化 promise 时做错了什么。在评论中,我提出了一些建议,您可以:
- 使用现代
async/await
(必要时进行转译)
- 使用已经提供异步迭代的预构建库
- Write/use 一些经过测试的实用函数,您可以使用它们来代替每次手动编写
.reduce()
循环。
如果#1 或#2 不切实际,我建议您自己制作经过测试的实用函数,因为 .reduce()
序列化方法很容易出错,而且对于还没有看过的人来说并不总是微不足道的代码知道它在做什么,而一个适当命名的实用函数编写和测试一次更容易使用和理解(一旦函数被编写)并且显然它也使重用变得实用。
对于预构建的库,Bluebird 和 Async 都具有这些功能(个人而言,我更喜欢 Bluebird)并且我自己在嵌入式项目中使用过 Bluebird(Raspberry Pi)运行 旧版本的 JS。
至于测试的实用功能,这里有几个你可以快速使用。
iterateAsync()
就像一个异步 .forEach()
mapAsync()
就像一个异步 .map()
reduceAsync()
就像一个异步 .reduce()
所有函数都将一个数组作为第一个参数,returns一个函数作为第二个参数。这些是 ES5 兼容的,但请假设 Promise
可用。以下是三个函数:
// iterate an array sequentially, calling a function (that returns a promise)
// on each element of the array
// The final resolved value is whatever the last call to fn(item) resolves to
// like an asynchronous .forEach()
function iterateAsync(array, fn) {
return array.reduce(function(p, item) {
return p.then(function() {
return fn(item);
});
}, Promise.resolve());
}
// iterate an array sequentially, calling a function (that returns a promise)
// on each element of the array
// The final resolved value is an array of what all the fn(item) calls resolved to
// like an asynchronous .map()
function mapAsync(array, fn) {
var results = [];
return array.reduce(function(p, item) {
return p.then(function() {
return fn(item).then(function(val) {
results.push(val);
return val;
});
});
}, Promise.resolve()).then(function() {
return results;
});
}
// iterate an array sequentially, calling a function fn(item, val)
// (that returns a promise) on each element of the array. Like array.reduce(),
// the next fn(accumulator, item) is passed the previous resolved value and the promise
// that fn() returns should resolve to the value you want passed to the next
// link in the chain
// The final resolved value is whatever the last call to fn(item, val) resolves to
// like an asynchronous .reduce()
function reduceAsync(array, fn, initialVal) {
return array.reduce(function(p, item) {
return p.then(function(accumulator) {
return fn(accumulator, item);
});
}, Promise.resolve(initialVal));
}
请注意,使用现代 Javascript 功能(特别是 async/await
),所有这些通常更简单,因此这些主要用于当这些现代功能不可用或转译不实用时。
为了完整起见,我要补充一点,以这种方式使用 .reduce()
可能不适用于迭代非常大的数组。这是因为它所做的是同步预构建承诺链 p.then().then().then().then()
,其中 .then()
的数量等于数组的长度。如果您的数组非常大(数万或数十万个元素长),这可能需要大量内存来预构建所有这些承诺并将它们链接在一起。
对于像您提到的 "limited environment" 中的非常大的数组,您可能希望像这样手动迭代更多,它不会预先构建任何大型结构,并且一次只使用一个承诺:
function iterateAsync(list, fn) {
return new Promise(function(resolve, reject) {
var index = 0;
function next(val) {
if (index < list.length) {
try {
fn(list[index++]).then(next, reject);
} catch(e) {
reject(e);
}
} else {
resolve(val);
}
}
next();
});
}
我目前正在研究一个相当简单的逻辑来处理排队的 ZPL 打印作业,这些作业存储在一个数组中,然后迭代该数组,将每个作业的 n 份副本发送到打印机。
我正在将数组缩减为一个承诺链,为每个将副本发送到打印机的作业混合一个子链。对打印机的调用是同步的(我知道......)所以我将它们中的每一个都包装到一个 Promise 中,该 Promise 仅在打印机收到副本时才解析,从而确保顺序处理。
在传输失败的情况下,当前的承诺会拒绝并在主链中捕获一个人为错误。
到目前为止的理论,唉,子链之间似乎存在一种竞争条件。
我已经尽力了,但是就是看不出来...
这里是一些简化的代码 + fiddle,请注意子链是如何 运行 连续的:
['job1', 'job2'].reduce((pMain, item, curIndex) => {
var pSub = Promise.resolve();
for (let i = 0; i < 2; i++) pSub = pSub.then(() => new Promise((resolve, reject) => setTimeout(reject, 2000)));
return pMain.then(() => pSub);
}, Promise.resolve())
.then(() => /* print all done */)
.catch( handleError );
jsfiddle with console.logs here
非常感谢任何建议。被困在如此微不足道的事情上真是太可惜了。
您的 pSub
链全部创建并 运行 在 reduce
调用期间同步。要成为连续的,他们需要进入 then
回调:
['job1', 'job2'].reduce((pMain, item, curIndex) => {
return pMain.then(() => {
var pSub = Promise.resolve();
for (let i = 0; i < 2; i++)
pSub = pSub.then(() => new Promise((resolve, reject) => setTimeout(reject, 2000)));
return pSub;
});
}, Promise.resolve())
或者在两个循环中只构建一个链:
['job1', 'job2'].reduce((promise, item, outerIndex) => {
return Array.from({length: 2}).reduce((promise, _, innerIndex) => {
return promise.then(() => new Promise((resolve, reject) => setTimeout(reject, 2000)));
}, promise);
}, Promise.resolve())
当然 @jfriend 是对的,对于顺序任务你应该只写 async
/await
代码:
for (const item of ['job1', 'job2']) {
for (let i = 0; i < 2; i++) {
await new Promise((resolve, reject) => setTimeout(reject, 2000));
}
}
您还可以使用该解决方案轻松地将 try
块放在正确的水平上。
我想有很多方法可以实现这一点,但就我个人而言,我经常做的是创建一个函数数组,return一个 Promise(一个 PromiseFactory,你可能会说)。
const promiseFactoryA = () => {
return new Promise(resolve => {
console.log('PromiseA started...');
setTimeout(() => {
console.log('PromiseA resolved after 300ms');
resolve();
})
}, 300);
}
const promiseFactories = [promiseFactoryA, promiseFactoryA];
然后,我们可以将数组传递给此函数,该函数将按顺序 运行 它们:
const runPromiseSequentially = promiseFactories => {
let result = Promise.resolve();
promiseFactories.forEach(
(promiseFactory) => {
result = result.then(() => promiseFactory());
},
);
return result;
}
runPromiseSequentially(promiseFactories);
基本上它所做的就是在我们想要开始操作时要求 PromiseFactory 创建 Promise。
示例REPL
不过,如果您可以使用 async
和 await
,那么就没有必要了。
所以,到目前为止,您已经了解在使用 .reduce()
序列化 promise 时做错了什么。在评论中,我提出了一些建议,您可以:
- 使用现代
async/await
(必要时进行转译) - 使用已经提供异步迭代的预构建库
- Write/use 一些经过测试的实用函数,您可以使用它们来代替每次手动编写
.reduce()
循环。
如果#1 或#2 不切实际,我建议您自己制作经过测试的实用函数,因为 .reduce()
序列化方法很容易出错,而且对于还没有看过的人来说并不总是微不足道的代码知道它在做什么,而一个适当命名的实用函数编写和测试一次更容易使用和理解(一旦函数被编写)并且显然它也使重用变得实用。
对于预构建的库,Bluebird 和 Async 都具有这些功能(个人而言,我更喜欢 Bluebird)并且我自己在嵌入式项目中使用过 Bluebird(Raspberry Pi)运行 旧版本的 JS。
至于测试的实用功能,这里有几个你可以快速使用。
iterateAsync()
就像一个异步 .forEach()
mapAsync()
就像一个异步 .map()
reduceAsync()
就像一个异步 .reduce()
所有函数都将一个数组作为第一个参数,returns一个函数作为第二个参数。这些是 ES5 兼容的,但请假设 Promise
可用。以下是三个函数:
// iterate an array sequentially, calling a function (that returns a promise)
// on each element of the array
// The final resolved value is whatever the last call to fn(item) resolves to
// like an asynchronous .forEach()
function iterateAsync(array, fn) {
return array.reduce(function(p, item) {
return p.then(function() {
return fn(item);
});
}, Promise.resolve());
}
// iterate an array sequentially, calling a function (that returns a promise)
// on each element of the array
// The final resolved value is an array of what all the fn(item) calls resolved to
// like an asynchronous .map()
function mapAsync(array, fn) {
var results = [];
return array.reduce(function(p, item) {
return p.then(function() {
return fn(item).then(function(val) {
results.push(val);
return val;
});
});
}, Promise.resolve()).then(function() {
return results;
});
}
// iterate an array sequentially, calling a function fn(item, val)
// (that returns a promise) on each element of the array. Like array.reduce(),
// the next fn(accumulator, item) is passed the previous resolved value and the promise
// that fn() returns should resolve to the value you want passed to the next
// link in the chain
// The final resolved value is whatever the last call to fn(item, val) resolves to
// like an asynchronous .reduce()
function reduceAsync(array, fn, initialVal) {
return array.reduce(function(p, item) {
return p.then(function(accumulator) {
return fn(accumulator, item);
});
}, Promise.resolve(initialVal));
}
请注意,使用现代 Javascript 功能(特别是 async/await
),所有这些通常更简单,因此这些主要用于当这些现代功能不可用或转译不实用时。
为了完整起见,我要补充一点,以这种方式使用 .reduce()
可能不适用于迭代非常大的数组。这是因为它所做的是同步预构建承诺链 p.then().then().then().then()
,其中 .then()
的数量等于数组的长度。如果您的数组非常大(数万或数十万个元素长),这可能需要大量内存来预构建所有这些承诺并将它们链接在一起。
对于像您提到的 "limited environment" 中的非常大的数组,您可能希望像这样手动迭代更多,它不会预先构建任何大型结构,并且一次只使用一个承诺:
function iterateAsync(list, fn) {
return new Promise(function(resolve, reject) {
var index = 0;
function next(val) {
if (index < list.length) {
try {
fn(list[index++]).then(next, reject);
} catch(e) {
reject(e);
}
} else {
resolve(val);
}
}
next();
});
}