如何打破 SIGTERM 上的异步系列?
How to break Async series on SIGTERM?
假设我有以下场景 -
async.series(
[
function (cbi) {
students.getAll('student', function (err, response) {
if (err) {
logger.error(err);
}
cbi(err, response);
});
},
function (cbi) {
students.deleteAll('student', function (err, response) {
if (err) {
logger.error(err);
}
cbi(err, response);
});
},
function (cbi) {
teachers.getAll('teacher', function (err, response) {
if (err) {
logger.error(err);
}
cbi(err, response);
});
},
function (cbi) {
teachers.deleteAll('teacher', function (err, response) {
if (err) {
logger.error(err);
}
cbi(err, response);
});
};
]
);
并且我希望在发送 SIGTERM
时进行优雅的清理。这是对所有学生或所有教师的清理,无论哪个正在进行,当信号发送时应该完成并且下一个不应该开始。
function (cbi) {
students.getAll('student', function (err, response) {
if (err || GLOBAL_VAR_SIGTERM === true) {
logger.error(err);
}
cbi(err, response);
});
}
我在想我应该设置一个全局变量来跟踪 SIGTERM
信号。
process.on('SIGTERM', function onSigterm () {
GLOBAL_VAR_SIGTERM = true;
}
有没有更好的方法来打破异步系列来打破 SIGTERM
信号?
如果您想从 async.series()
中响应 SIGTERM
事件,那么您是对的,最简单的方法是使用全局变量进行跟踪。
但是你需要将cbi(err, response)
函数中的第一个参数(error-first callback)设置为true
才能打断系列。
所以:
if (err || GLOBAL_VAR_SIGTERM === true) {
logger.error(err);
}
应该更像是:
if (err) logger.error(err);
if (GLOBAL_VAR_SIGTERM) err = new Error("SIGTERM: Aborting remaining tasks");
// You could just do err = true
// But best practice is to use an Error instance.
然后因为 cbi(err, response)
将被调用 err
值等于 true
剩余的任务将不会是 运行.
正如@adamrights 在 中指出的那样,您代码中的主要问题是您没有使用真实的 err
第一个参数调用 cbi(err, response)
,这对于停止async.series
继续队列中的下一个任务。
现在您的代码应该可以工作了,但是您的代码中有一个重复模式:
function (cbi) {
students.getAll('student', function (err, response) {
// these 3 lines appear in every callback function
if (GLOBAL_VAR_SIGTERM) err = new Error("SIGTERM: Aborting remaining tasks");
if (err) logger.error(err);
cbi(err, response);
// end of repeat pattern
});
}
你传递给每个异步任务的回调总是做同样的三行事情。我们知道 DRY 规则,将重复模式提取到另一个函数中以尽可能重用它总是一个好主意。
因此,与其重复声明匿名函数,不如声明一个工厂函数。
function callbackFactory(cbi) {
return function(err, response) {
if (GLOBAL_VAR_SIGTERM) err = new Error("SIGTERM: Aborting remaining tasks");
if (err) logger.error(err);
cbi(err, response);
}
}
// use arrow function to write more concise code
async.series(
[
cbi => students.getAll('student', callbackFactory(cbi)),
cbi => students.deleteAll('student', callbackFactory(cbi)),
cbi => teachers.getAll('teacher', callbackFactory(cbi)),
cbi => teachers.deleteAll('teacher', callbackFactory(cbi)),
]
);
进阶话题:使用装饰器处理横切关注
让我们进一步探讨这个话题。显然,在收到 SIGTERM
时提前中止是一个横切关注点,应该与业务逻辑分开。假设您的业务逻辑因任务而异:
async.series(
[
cbi => students.getAll('student', (err, response) => {
if (err) {
logger.error(err);
return cbi(err);
}
updateStudentCount(response.data.length) // <- extra work
cbi(err, response);
}),
cbi => teachers.getAll('student', (err, response) => {
if (err) {
logger.error(err);
return cbi(err);
}
updateTeacherCount(response.data.length) // <- different extra work
cbi(err, response);
})
]
);
因为回调是变化的,所以很难像以前那样提取到工厂函数中。从这个角度来说,我们最好在每个任务中注入 abort-early 行为,方便编写正常的业务逻辑。
这是 装饰器模式 派上用场的地方。但是全局变量不是实现它的最佳工具,我们将使用事件监听器。
装饰器的基本界面如下:
// `task` will be things like `cbi => students.getAll('student', ... )`
function decorateTaskAbortEarly(task) {
return (originalCbi) => {
...
task(originalCbi)
}
}
以下是我们的实施清单:
- 如果我们收到
SIGTERM
,我们将致电 originalCbi
- 但是当我们没有收到
SIGTERM
时,originalCbi
仍然可以在任何异步任务的回调中调用,例如正常
- 如果
originalCbi
被调用一次,我们应该取消订阅 SIGTERM
以防止内存泄漏
实施:
function decorateTaskAbortEarly(task) {
return (originalCbi) => {
// subscribe to `SIGTERM`
var listener = () => originalCbi(new Error("SIGTERM: Aborting remaining tasks"));
process.once('SIGTERM', listener);
var wrappedCbi = (err, response) => {
// unsubscribe if `cbi` is called once
process.off('SIGTERM', listener);
return originalCbi(err, response);
};
// pass `cbi` through to `task`
task(wrappedCbi);
}
}
// Usage:
async.series(
[
cbi => students.getAll('student', (err, response) => {
if (err) {
logger.error(err);
return cbi(err);
}
updateStudentCount(response.data.length)
cbi(err, response);
}),
cbi => teachers.getAll('student', (err, response) => {
if (err) {
logger.error(err);
return cbi(err);
}
updateTeacherCount(response.data.length)
cbi(err, response);
})
].map(decorateTaskAbortEarly) // <--- nice API
);
我喜欢其他答案。这是实现相同目的的另一种方式。我正在使用我自己的示例:
var async = require('async');
var ifAsync = require('if-async')
var GLOBAL_VAR_SIGTERM = false;
async.series({
one: ifAsync(notsigterm).then(function (callback) {
setTimeout(function () {
console.log('one');
callback(null, 1);
}, 1000);
}),
two: ifAsync(notsigterm).then(function (callback) {
setTimeout(function () {
console.log('two');
callback(null, 2);
}, 1000);
}),
three: ifAsync(notsigterm).then(function (callback) {
setTimeout(function () {
console.log('three');
callback(null, 3);
}, 1000);
}),
four: ifAsync(notsigterm).then(function (callback) {
setTimeout(function () {
console.log('four');
callback(null, 4);
}, 1000);
}),
}, function (err, results) {
if (err) {
//Handle the error in some way. Here we simply throw it
//Other options: pass it on to an outer callback, log it etc.
throw err;
}
console.log('Results are ' + JSON.stringify(results));
});
process.on('SIGTERM', function onSigterm () {
console.log('SIGTERM caught');
GLOBAL_VAR_SIGTERM = true;
});
function notsigterm(callback) {
if (!GLOBAL_VAR_SIGTERM) return callback(null, true)
else return callback(null, false)
}
我正在使用一个名为 ifAsync
的包,它允许您使用谓词 notsigterm
来决定是否应调用回调。如果 notsigterm
returns true 则回调将被调用,否则将被跳过。这是对其他人的类似回答,但不知何故我发现这个更清洁。如果您有任何疑问,请告诉我。
假设我有以下场景 -
async.series(
[
function (cbi) {
students.getAll('student', function (err, response) {
if (err) {
logger.error(err);
}
cbi(err, response);
});
},
function (cbi) {
students.deleteAll('student', function (err, response) {
if (err) {
logger.error(err);
}
cbi(err, response);
});
},
function (cbi) {
teachers.getAll('teacher', function (err, response) {
if (err) {
logger.error(err);
}
cbi(err, response);
});
},
function (cbi) {
teachers.deleteAll('teacher', function (err, response) {
if (err) {
logger.error(err);
}
cbi(err, response);
});
};
]
);
并且我希望在发送 SIGTERM
时进行优雅的清理。这是对所有学生或所有教师的清理,无论哪个正在进行,当信号发送时应该完成并且下一个不应该开始。
function (cbi) {
students.getAll('student', function (err, response) {
if (err || GLOBAL_VAR_SIGTERM === true) {
logger.error(err);
}
cbi(err, response);
});
}
我在想我应该设置一个全局变量来跟踪 SIGTERM
信号。
process.on('SIGTERM', function onSigterm () {
GLOBAL_VAR_SIGTERM = true;
}
有没有更好的方法来打破异步系列来打破 SIGTERM
信号?
如果您想从 async.series()
中响应 SIGTERM
事件,那么您是对的,最简单的方法是使用全局变量进行跟踪。
但是你需要将cbi(err, response)
函数中的第一个参数(error-first callback)设置为true
才能打断系列。
所以:
if (err || GLOBAL_VAR_SIGTERM === true) {
logger.error(err);
}
应该更像是:
if (err) logger.error(err);
if (GLOBAL_VAR_SIGTERM) err = new Error("SIGTERM: Aborting remaining tasks");
// You could just do err = true
// But best practice is to use an Error instance.
然后因为 cbi(err, response)
将被调用 err
值等于 true
剩余的任务将不会是 运行.
正如@adamrights 在 err
第一个参数调用 cbi(err, response)
,这对于停止async.series
继续队列中的下一个任务。
现在您的代码应该可以工作了,但是您的代码中有一个重复模式:
function (cbi) {
students.getAll('student', function (err, response) {
// these 3 lines appear in every callback function
if (GLOBAL_VAR_SIGTERM) err = new Error("SIGTERM: Aborting remaining tasks");
if (err) logger.error(err);
cbi(err, response);
// end of repeat pattern
});
}
你传递给每个异步任务的回调总是做同样的三行事情。我们知道 DRY 规则,将重复模式提取到另一个函数中以尽可能重用它总是一个好主意。
因此,与其重复声明匿名函数,不如声明一个工厂函数。
function callbackFactory(cbi) {
return function(err, response) {
if (GLOBAL_VAR_SIGTERM) err = new Error("SIGTERM: Aborting remaining tasks");
if (err) logger.error(err);
cbi(err, response);
}
}
// use arrow function to write more concise code
async.series(
[
cbi => students.getAll('student', callbackFactory(cbi)),
cbi => students.deleteAll('student', callbackFactory(cbi)),
cbi => teachers.getAll('teacher', callbackFactory(cbi)),
cbi => teachers.deleteAll('teacher', callbackFactory(cbi)),
]
);
进阶话题:使用装饰器处理横切关注
让我们进一步探讨这个话题。显然,在收到 SIGTERM
时提前中止是一个横切关注点,应该与业务逻辑分开。假设您的业务逻辑因任务而异:
async.series(
[
cbi => students.getAll('student', (err, response) => {
if (err) {
logger.error(err);
return cbi(err);
}
updateStudentCount(response.data.length) // <- extra work
cbi(err, response);
}),
cbi => teachers.getAll('student', (err, response) => {
if (err) {
logger.error(err);
return cbi(err);
}
updateTeacherCount(response.data.length) // <- different extra work
cbi(err, response);
})
]
);
因为回调是变化的,所以很难像以前那样提取到工厂函数中。从这个角度来说,我们最好在每个任务中注入 abort-early 行为,方便编写正常的业务逻辑。
这是 装饰器模式 派上用场的地方。但是全局变量不是实现它的最佳工具,我们将使用事件监听器。
装饰器的基本界面如下:
// `task` will be things like `cbi => students.getAll('student', ... )`
function decorateTaskAbortEarly(task) {
return (originalCbi) => {
...
task(originalCbi)
}
}
以下是我们的实施清单:
- 如果我们收到
SIGTERM
,我们将致电 - 但是当我们没有收到
SIGTERM
时,originalCbi
仍然可以在任何异步任务的回调中调用,例如正常 - 如果
originalCbi
被调用一次,我们应该取消订阅SIGTERM
以防止内存泄漏
originalCbi
实施:
function decorateTaskAbortEarly(task) {
return (originalCbi) => {
// subscribe to `SIGTERM`
var listener = () => originalCbi(new Error("SIGTERM: Aborting remaining tasks"));
process.once('SIGTERM', listener);
var wrappedCbi = (err, response) => {
// unsubscribe if `cbi` is called once
process.off('SIGTERM', listener);
return originalCbi(err, response);
};
// pass `cbi` through to `task`
task(wrappedCbi);
}
}
// Usage:
async.series(
[
cbi => students.getAll('student', (err, response) => {
if (err) {
logger.error(err);
return cbi(err);
}
updateStudentCount(response.data.length)
cbi(err, response);
}),
cbi => teachers.getAll('student', (err, response) => {
if (err) {
logger.error(err);
return cbi(err);
}
updateTeacherCount(response.data.length)
cbi(err, response);
})
].map(decorateTaskAbortEarly) // <--- nice API
);
我喜欢其他答案。这是实现相同目的的另一种方式。我正在使用我自己的示例:
var async = require('async');
var ifAsync = require('if-async')
var GLOBAL_VAR_SIGTERM = false;
async.series({
one: ifAsync(notsigterm).then(function (callback) {
setTimeout(function () {
console.log('one');
callback(null, 1);
}, 1000);
}),
two: ifAsync(notsigterm).then(function (callback) {
setTimeout(function () {
console.log('two');
callback(null, 2);
}, 1000);
}),
three: ifAsync(notsigterm).then(function (callback) {
setTimeout(function () {
console.log('three');
callback(null, 3);
}, 1000);
}),
four: ifAsync(notsigterm).then(function (callback) {
setTimeout(function () {
console.log('four');
callback(null, 4);
}, 1000);
}),
}, function (err, results) {
if (err) {
//Handle the error in some way. Here we simply throw it
//Other options: pass it on to an outer callback, log it etc.
throw err;
}
console.log('Results are ' + JSON.stringify(results));
});
process.on('SIGTERM', function onSigterm () {
console.log('SIGTERM caught');
GLOBAL_VAR_SIGTERM = true;
});
function notsigterm(callback) {
if (!GLOBAL_VAR_SIGTERM) return callback(null, true)
else return callback(null, false)
}
我正在使用一个名为 ifAsync
的包,它允许您使用谓词 notsigterm
来决定是否应调用回调。如果 notsigterm
returns true 则回调将被调用,否则将被跳过。这是对其他人的类似回答,但不知何故我发现这个更清洁。如果您有任何疑问,请告诉我。