如果满足任何条件,则触发函数的执行

Trigger the execution of a function if any condition is met

我正在 Node.js 中使用 expressjs 编写 HTTP API,这就是我想要实现的目标:

代码如下所示:

// only a single execution of this function is allowed at a time
// which is not the case with the current code
async function task(reason: string) {
  console.log("do thing because %s...", reason);
  await sleep(1000);
  console.log("done");
}

// call task regularly
setIntervalAsync(async () => {
  await task("ticker");
}, 5000) // normally 1min

// call task immediately
app.get("/task", async (req, res) => {
  await task("trigger");
  res.send("ok");
});

我已经在 https://github.com/piec/question.js

上放置了一个完整的示例项目

如果我在 go 中,我会像 this 那样做,这很容易,但我不知道如何使用 Node.js。


我考虑过或尝试过的想法:

谢谢!

这是一个使用 RxJS#Subject 的版本,几乎可以正常工作。如何完成它取决于您的用例。

async function task(reason: string) {
  console.log("do thing because %s...", reason);
  await sleep(1000);
  console.log("done");
}

const run = new Subject<string>();
const effect$ = run.pipe(
  // Limit one task at a time
  concatMap(task),
  share()
);
const effectSub = effect$.subscribe();

interval(5000).subscribe(_ => 
  run.next("ticker")
);

// call task immediately
app.get("/task", async (req, res) => {
  effect$.pipe(
    take(1)
  ).subscribe(_ =>
    res.send("ok")
  );
  run.next("trigger");
});

这里的问题是 res.send("ok") 链接到下一次发射的 effect$ 流。这可能不是您要调用的 run.next 生成的。

有很多方法可以解决这个问题。例如,您可以使用 ID 标记每个发射,然后在使用 res.send("ok").

之前等待相应的发射

如果调用自然地区分自己,还有更好的方法。


笨拙的 ID 版本

随机生成一个 ID 不是一个好主意,但它得到了普遍的支持。您可以根据需要生成唯一 ID。它们可以以某种方式直接集成到任务中,或者可以像它们在这里一样保持 100% 独立(任务本身不知道它在 运行 之前被分配了一个 ID)。


interface IdTask {
  taskId: number,
  reason: string
}

interface IdResponse {
  taskId: number,
  response: any
}

async function task(reason: string) {
  console.log("do thing because %s...", reason);
  await sleep(1000);
  console.log("done");
}

const run = new Subject<IdTask>();
const effect$: Observable<IdResponse> = run.pipe(
  // concatMap only allows one observable at a time to run
  concatMap((eTask: IdTask) => from(task(eTask.reason)).pipe(
    map((response:any) => ({
      taskId: eTask.taskId,
      response
    })as IdResponse)
  )),
  share()
);
const effectSub = effect$.subscribe({
  next: v => console.log("This is a shared task emission: ", v)
});

interval(5000).subscribe(num => 
  run.next({
    taskId: num,
    reason: "ticker"
  })
);

// call task immediately
app.get("/task", async (req, res) => {
  const randomId = Math.random();
  effect$.pipe(
    filter(({taskId}) => taskId == randomId),
    take(1)
  ).subscribe(_ =>
    res.send("ok")
  );
  run.next({
    taskId: randomId,
    reason: "trigger"
  });
});

您可以创建自己的序列化异步队列并运行任务通过它。

这个队列使用一个标志来跟踪它是否已经在 运行 执行异步操作。如果是这样,它只是将任务添加到队列中,并在当前操作完成后 运行 它。如果没有,现在 运行 就可以了。将其添加到队列 return 是一个承诺,因此调用者可以知道任务何时最终到达 运行。

如果任务是异步的,则它们需要 return 链接到异步 activity 的承诺。您也可以混合使用非异步任务,它们也会被序列化。

class SerializedAsyncQueue {
    constructor() {
        this.tasks = [];
        this.inProcess = false;
    }
    // adds a promise-returning function and its args to the queue
    // returns a promise that resolves when the function finally gets to run
    add(fn, ...args) {
        let d = new Deferred();
        this.tasks.push({ fn, args: ...args, deferred: d });
        this.check();
        return d.promise;
    }
    check() {
        if (!this.inProcess && this.tasks.length) {
            // run next task
            this.inProcess = true;
            const nextTask = this.tasks.shift();
            Promise.resolve(nextTask.fn(...nextTask.args)).then(val => {
                this.inProcess = false;
                nextTask.deferred.resolve(val);
                this.check();
            }).catch(err => {
                console.log(err);
                this.inProcess = false;
                nextTask.deferred.reject(err);
                this.check();
            });
        }
    }
}

const Deferred = function() {
    if (!(this instanceof Deferred)) {
        return new Deferred();
    }
    const p = this.promise = new Promise((resolve, reject) => {
        this.resolve = resolve;
        this.reject = reject;
    });
    this.then = p.then.bind(p);
    this.catch = p.catch.bind(p);
    if (p.finally) {
        this.finally = p.finally.bind(p);
    }
}


let queue = new SerializedAsyncQueue();

// utility function
const sleep = function(t) {
    return new Promise(resolve => {
        setTimeout(resolve, t);
    });
}

// only a single execution of this function is allowed at a time
// so it is run only via the queue that makes sure it is serialized
async function task(reason: string) {
    function runIt() {
        console.log("do thing because %s...", reason);
        await sleep(1000);
        console.log("done");
    }
    return queue.add(runIt);
}

// call task regularly
setIntervalAsync(async () => {
    await task("ticker");
}, 5000) // normally 1min

// call task immediately
app.get("/task", async (req, res) => {
    await task("trigger");
    res.send("ok");
});