异步库是否有任何用于处理管道的控制流?
Does the async library have any control flow for handling pipelines?
我正在查看 async 库,但似乎找不到用于处理管道的控制流。我只是想知道我是否遗漏了什么。
我想实现一个管道。示例:
let pipeline = [];
pipeline.push((input, next) => { next(null, input); });
pipeline.push((input, next) => { next(null, input); });
var pipelineResult = pipelineRunner.run(pipeline, 'sample input', (error, result) => {});
解释:调用了一系列函数。每个函数接收一个 input
和一个 next
函数。每个函数处理 input
并将其作为参数传递给 next
函数。作为管道执行的结果,我得到了处理后的 input
,或者,如果任何函数调用 next
出错,管道停止并调用回调。
我想这是一个很常见的用例,所以我认为 async 可以做到,但我找不到它。如果您知道任何其他库可以达到这样的结果,那也是可以接受的。
您正在寻找 async.waterfall
function。
或者,如果您需要一个可以将初始 input
传递给的函数,您可以 apply asyc.seq
or async.compose
使用多个参数。
我最终自己实现了它,尽管正如@Bergi 刚刚展示的那样,async
确实支持它。
/**
* Runs asynchronous pipelines
*/
class PipelineRunner {
/**
* Runs the given pipeline
* @param pipeline - The array of functions that should be executed (middleware)
* @param middlewareArgs - The array of arguments that should be passed in to the middleware
* @param input
* @param next
*/
run(pipeline, middlewareArgs, input, next) {
if (!pipeline) throw Error('\'pipeline\' should be truthy');
if (!context) throw Error('\'context\' should be truthy');
if (!input) throw Error('\'input\' should be truthy');
if (!next) throw Error('\'next\' should be truthy');
if (!pipeline.length) throw Error('\'pipeline.length\' should be truthy');
let index = 0;
// the link function "binds" every function in the pipeline array together
let link = (error, result) => {
if (error) {
next(error);
return;
}
let nextIndex = index++;
if (nextIndex < pipeline.length) {
let args = [result].concat(middlewareArgs).concat(link);
pipeline[nextIndex].apply(null, args);
}
else {
next(null, result);
}
};
let args = [input].concat(middlewareArgs).concat(link);
pipeline[index++].apply(null, args);
}
}
export default new PipelineRunner();
单元测试:
import chai from 'chai';
import pipelineRunner from '../src/server/lib/pipelines/pipelineRunner';
let assert = chai.assert;
describe('PipelineRunner', () => {
describe('run', function() {
it('Happy path', () => {
let pipeline = [];
pipeline.push((input, next) => { next(null, input); });
pipeline.push((input, next) => { next(null, input); });
pipelineRunner.run(pipeline, [], 'happy', (error, result) => {
assert.strictEqual(result, "happy");
});
});
it('Happy path - with arguments', () => {
let pipeline = [];
pipeline.push((input, someArgument, next) => {
assert.strictEqual(someArgument, 'something that should be passed in');
next(null, input);
});
pipeline.push((input, someArgument, next) => { next(null, input); });
pipelineRunner.run(pipeline, ['something that should be passed in'], 'happy', (error, result) => {
assert.strictEqual(result, "happy");
});
});
it('When something goes wrong', () => {
let pipeline = [];
pipeline.push((input, next) => { next(null, input); });
pipeline.push((input, next) => { next('something went wrong'); });
pipelineRunner.run(pipeline, [], 'happy', (error, result) => {
assert.strictEqual(error, 'something went wrong');
});
});
});
});
我正在查看 async 库,但似乎找不到用于处理管道的控制流。我只是想知道我是否遗漏了什么。
我想实现一个管道。示例:
let pipeline = [];
pipeline.push((input, next) => { next(null, input); });
pipeline.push((input, next) => { next(null, input); });
var pipelineResult = pipelineRunner.run(pipeline, 'sample input', (error, result) => {});
解释:调用了一系列函数。每个函数接收一个 input
和一个 next
函数。每个函数处理 input
并将其作为参数传递给 next
函数。作为管道执行的结果,我得到了处理后的 input
,或者,如果任何函数调用 next
出错,管道停止并调用回调。
我想这是一个很常见的用例,所以我认为 async 可以做到,但我找不到它。如果您知道任何其他库可以达到这样的结果,那也是可以接受的。
您正在寻找 async.waterfall
function。
或者,如果您需要一个可以将初始 input
传递给的函数,您可以 apply asyc.seq
or async.compose
使用多个参数。
我最终自己实现了它,尽管正如@Bergi 刚刚展示的那样,async
确实支持它。
/**
* Runs asynchronous pipelines
*/
class PipelineRunner {
/**
* Runs the given pipeline
* @param pipeline - The array of functions that should be executed (middleware)
* @param middlewareArgs - The array of arguments that should be passed in to the middleware
* @param input
* @param next
*/
run(pipeline, middlewareArgs, input, next) {
if (!pipeline) throw Error('\'pipeline\' should be truthy');
if (!context) throw Error('\'context\' should be truthy');
if (!input) throw Error('\'input\' should be truthy');
if (!next) throw Error('\'next\' should be truthy');
if (!pipeline.length) throw Error('\'pipeline.length\' should be truthy');
let index = 0;
// the link function "binds" every function in the pipeline array together
let link = (error, result) => {
if (error) {
next(error);
return;
}
let nextIndex = index++;
if (nextIndex < pipeline.length) {
let args = [result].concat(middlewareArgs).concat(link);
pipeline[nextIndex].apply(null, args);
}
else {
next(null, result);
}
};
let args = [input].concat(middlewareArgs).concat(link);
pipeline[index++].apply(null, args);
}
}
export default new PipelineRunner();
单元测试:
import chai from 'chai';
import pipelineRunner from '../src/server/lib/pipelines/pipelineRunner';
let assert = chai.assert;
describe('PipelineRunner', () => {
describe('run', function() {
it('Happy path', () => {
let pipeline = [];
pipeline.push((input, next) => { next(null, input); });
pipeline.push((input, next) => { next(null, input); });
pipelineRunner.run(pipeline, [], 'happy', (error, result) => {
assert.strictEqual(result, "happy");
});
});
it('Happy path - with arguments', () => {
let pipeline = [];
pipeline.push((input, someArgument, next) => {
assert.strictEqual(someArgument, 'something that should be passed in');
next(null, input);
});
pipeline.push((input, someArgument, next) => { next(null, input); });
pipelineRunner.run(pipeline, ['something that should be passed in'], 'happy', (error, result) => {
assert.strictEqual(result, "happy");
});
});
it('When something goes wrong', () => {
let pipeline = [];
pipeline.push((input, next) => { next(null, input); });
pipeline.push((input, next) => { next('something went wrong'); });
pipelineRunner.run(pipeline, [], 'happy', (error, result) => {
assert.strictEqual(error, 'something went wrong');
});
});
});
});