如果满足任何条件,则触发函数的执行
Trigger the execution of a function if any condition is met
我正在 Node.js 中使用 expressjs 编写 HTTP API,这就是我想要实现的目标:
- 我有一项定期任务,我想定期 运行,大约每分钟一次。此任务是使用名为
task
. 的异步函数实现的
- 响应我的 API 中的呼叫,我也希望立即调用该任务
task
函数的两次执行不能同时进行。每次执行都应该 运行 在另一个执行开始之前完成。
代码如下所示:
// only a single execution of this function is allowed at a time
// which is not the case with the current code
async function task(reason: string) {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
// call task regularly
setIntervalAsync(async () => {
await task("ticker");
}, 5000) // normally 1min
// call task immediately
app.get("/task", async (req, res) => {
await task("trigger");
res.send("ok");
});
我已经在 https://github.com/piec/question.js
上放置了一个完整的示例项目
如果我在 go 中,我会像 this 那样做,这很容易,但我不知道如何使用 Node.js。
我考虑过或尝试过的想法:
- 我显然可以使用 async-mutex 库中的互斥锁将
task
放入关键部分。但是我不太喜欢在js代码中添加互斥量。
- 许多人似乎正在使用带有工作进程的消息队列库(bee-queue、bullmq 等),但这通常会增加对外部服务(如 redis)的依赖。另外,如果我是正确的,代码会稍微复杂一些,因为我需要一个主入口点和一个工作进程入口点。此外,您不能像在“正常”单进程情况下那样轻松地与工作人员共享对象。
- 我已经尝试 RxJs subject 来创建一个生产者消费者频道。但是我无法将
task
的执行限制为一次一个(task
是异步的)。
谢谢!
这是一个使用 RxJS#Subject 的版本,几乎可以正常工作。如何完成它取决于您的用例。
async function task(reason: string) {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
const run = new Subject<string>();
const effect$ = run.pipe(
// Limit one task at a time
concatMap(task),
share()
);
const effectSub = effect$.subscribe();
interval(5000).subscribe(_ =>
run.next("ticker")
);
// call task immediately
app.get("/task", async (req, res) => {
effect$.pipe(
take(1)
).subscribe(_ =>
res.send("ok")
);
run.next("trigger");
});
这里的问题是 res.send("ok")
链接到下一次发射的 effect$
流。这可能不是您要调用的 run.next
生成的。
有很多方法可以解决这个问题。例如,您可以使用 ID 标记每个发射,然后在使用 res.send("ok")
.
之前等待相应的发射
如果调用自然地区分自己,还有更好的方法。
笨拙的 ID 版本
随机生成一个 ID 不是一个好主意,但它得到了普遍的支持。您可以根据需要生成唯一 ID。它们可以以某种方式直接集成到任务中,或者可以像它们在这里一样保持 100% 独立(任务本身不知道它在 运行 之前被分配了一个 ID)。
interface IdTask {
taskId: number,
reason: string
}
interface IdResponse {
taskId: number,
response: any
}
async function task(reason: string) {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
const run = new Subject<IdTask>();
const effect$: Observable<IdResponse> = run.pipe(
// concatMap only allows one observable at a time to run
concatMap((eTask: IdTask) => from(task(eTask.reason)).pipe(
map((response:any) => ({
taskId: eTask.taskId,
response
})as IdResponse)
)),
share()
);
const effectSub = effect$.subscribe({
next: v => console.log("This is a shared task emission: ", v)
});
interval(5000).subscribe(num =>
run.next({
taskId: num,
reason: "ticker"
})
);
// call task immediately
app.get("/task", async (req, res) => {
const randomId = Math.random();
effect$.pipe(
filter(({taskId}) => taskId == randomId),
take(1)
).subscribe(_ =>
res.send("ok")
);
run.next({
taskId: randomId,
reason: "trigger"
});
});
您可以创建自己的序列化异步队列并运行任务通过它。
这个队列使用一个标志来跟踪它是否已经在 运行 执行异步操作。如果是这样,它只是将任务添加到队列中,并在当前操作完成后 运行 它。如果没有,现在 运行 就可以了。将其添加到队列 return 是一个承诺,因此调用者可以知道任务何时最终到达 运行。
如果任务是异步的,则它们需要 return 链接到异步 activity 的承诺。您也可以混合使用非异步任务,它们也会被序列化。
class SerializedAsyncQueue {
constructor() {
this.tasks = [];
this.inProcess = false;
}
// adds a promise-returning function and its args to the queue
// returns a promise that resolves when the function finally gets to run
add(fn, ...args) {
let d = new Deferred();
this.tasks.push({ fn, args: ...args, deferred: d });
this.check();
return d.promise;
}
check() {
if (!this.inProcess && this.tasks.length) {
// run next task
this.inProcess = true;
const nextTask = this.tasks.shift();
Promise.resolve(nextTask.fn(...nextTask.args)).then(val => {
this.inProcess = false;
nextTask.deferred.resolve(val);
this.check();
}).catch(err => {
console.log(err);
this.inProcess = false;
nextTask.deferred.reject(err);
this.check();
});
}
}
}
const Deferred = function() {
if (!(this instanceof Deferred)) {
return new Deferred();
}
const p = this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
this.then = p.then.bind(p);
this.catch = p.catch.bind(p);
if (p.finally) {
this.finally = p.finally.bind(p);
}
}
let queue = new SerializedAsyncQueue();
// utility function
const sleep = function(t) {
return new Promise(resolve => {
setTimeout(resolve, t);
});
}
// only a single execution of this function is allowed at a time
// so it is run only via the queue that makes sure it is serialized
async function task(reason: string) {
function runIt() {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
return queue.add(runIt);
}
// call task regularly
setIntervalAsync(async () => {
await task("ticker");
}, 5000) // normally 1min
// call task immediately
app.get("/task", async (req, res) => {
await task("trigger");
res.send("ok");
});
我正在 Node.js 中使用 expressjs 编写 HTTP API,这就是我想要实现的目标:
- 我有一项定期任务,我想定期 运行,大约每分钟一次。此任务是使用名为
task
. 的异步函数实现的
- 响应我的 API 中的呼叫,我也希望立即调用该任务
task
函数的两次执行不能同时进行。每次执行都应该 运行 在另一个执行开始之前完成。
代码如下所示:
// only a single execution of this function is allowed at a time
// which is not the case with the current code
async function task(reason: string) {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
// call task regularly
setIntervalAsync(async () => {
await task("ticker");
}, 5000) // normally 1min
// call task immediately
app.get("/task", async (req, res) => {
await task("trigger");
res.send("ok");
});
我已经在 https://github.com/piec/question.js
上放置了一个完整的示例项目如果我在 go 中,我会像 this 那样做,这很容易,但我不知道如何使用 Node.js。
我考虑过或尝试过的想法:
- 我显然可以使用 async-mutex 库中的互斥锁将
task
放入关键部分。但是我不太喜欢在js代码中添加互斥量。 - 许多人似乎正在使用带有工作进程的消息队列库(bee-queue、bullmq 等),但这通常会增加对外部服务(如 redis)的依赖。另外,如果我是正确的,代码会稍微复杂一些,因为我需要一个主入口点和一个工作进程入口点。此外,您不能像在“正常”单进程情况下那样轻松地与工作人员共享对象。
- 我已经尝试 RxJs subject 来创建一个生产者消费者频道。但是我无法将
task
的执行限制为一次一个(task
是异步的)。
谢谢!
这是一个使用 RxJS#Subject 的版本,几乎可以正常工作。如何完成它取决于您的用例。
async function task(reason: string) {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
const run = new Subject<string>();
const effect$ = run.pipe(
// Limit one task at a time
concatMap(task),
share()
);
const effectSub = effect$.subscribe();
interval(5000).subscribe(_ =>
run.next("ticker")
);
// call task immediately
app.get("/task", async (req, res) => {
effect$.pipe(
take(1)
).subscribe(_ =>
res.send("ok")
);
run.next("trigger");
});
这里的问题是 res.send("ok")
链接到下一次发射的 effect$
流。这可能不是您要调用的 run.next
生成的。
有很多方法可以解决这个问题。例如,您可以使用 ID 标记每个发射,然后在使用 res.send("ok")
.
如果调用自然地区分自己,还有更好的方法。
笨拙的 ID 版本
随机生成一个 ID 不是一个好主意,但它得到了普遍的支持。您可以根据需要生成唯一 ID。它们可以以某种方式直接集成到任务中,或者可以像它们在这里一样保持 100% 独立(任务本身不知道它在 运行 之前被分配了一个 ID)。
interface IdTask {
taskId: number,
reason: string
}
interface IdResponse {
taskId: number,
response: any
}
async function task(reason: string) {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
const run = new Subject<IdTask>();
const effect$: Observable<IdResponse> = run.pipe(
// concatMap only allows one observable at a time to run
concatMap((eTask: IdTask) => from(task(eTask.reason)).pipe(
map((response:any) => ({
taskId: eTask.taskId,
response
})as IdResponse)
)),
share()
);
const effectSub = effect$.subscribe({
next: v => console.log("This is a shared task emission: ", v)
});
interval(5000).subscribe(num =>
run.next({
taskId: num,
reason: "ticker"
})
);
// call task immediately
app.get("/task", async (req, res) => {
const randomId = Math.random();
effect$.pipe(
filter(({taskId}) => taskId == randomId),
take(1)
).subscribe(_ =>
res.send("ok")
);
run.next({
taskId: randomId,
reason: "trigger"
});
});
您可以创建自己的序列化异步队列并运行任务通过它。
这个队列使用一个标志来跟踪它是否已经在 运行 执行异步操作。如果是这样,它只是将任务添加到队列中,并在当前操作完成后 运行 它。如果没有,现在 运行 就可以了。将其添加到队列 return 是一个承诺,因此调用者可以知道任务何时最终到达 运行。
如果任务是异步的,则它们需要 return 链接到异步 activity 的承诺。您也可以混合使用非异步任务,它们也会被序列化。
class SerializedAsyncQueue {
constructor() {
this.tasks = [];
this.inProcess = false;
}
// adds a promise-returning function and its args to the queue
// returns a promise that resolves when the function finally gets to run
add(fn, ...args) {
let d = new Deferred();
this.tasks.push({ fn, args: ...args, deferred: d });
this.check();
return d.promise;
}
check() {
if (!this.inProcess && this.tasks.length) {
// run next task
this.inProcess = true;
const nextTask = this.tasks.shift();
Promise.resolve(nextTask.fn(...nextTask.args)).then(val => {
this.inProcess = false;
nextTask.deferred.resolve(val);
this.check();
}).catch(err => {
console.log(err);
this.inProcess = false;
nextTask.deferred.reject(err);
this.check();
});
}
}
}
const Deferred = function() {
if (!(this instanceof Deferred)) {
return new Deferred();
}
const p = this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
this.then = p.then.bind(p);
this.catch = p.catch.bind(p);
if (p.finally) {
this.finally = p.finally.bind(p);
}
}
let queue = new SerializedAsyncQueue();
// utility function
const sleep = function(t) {
return new Promise(resolve => {
setTimeout(resolve, t);
});
}
// only a single execution of this function is allowed at a time
// so it is run only via the queue that makes sure it is serialized
async function task(reason: string) {
function runIt() {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
return queue.add(runIt);
}
// call task regularly
setIntervalAsync(async () => {
await task("ticker");
}, 5000) // normally 1min
// call task immediately
app.get("/task", async (req, res) => {
await task("trigger");
res.send("ok");
});