运行 onCreate Firestore 事件后每 30 分钟执行一次 Cron 作业

Run a Cron Job every 30mins after onCreate Firestore event

我想要一个 cron job/scheduler,它会在 Firestore 中发生 onCreate 事件后每 30 分钟 运行。 cron 作业应该触发一个云函数,该函数选择最近 30 分钟内创建的文档——根据 json 模式验证它们——并将它们保存在另一个 collection.How 中,我是否实现了这一点,以编程方式编写这样一个调度程序? 什么也是故障安全机制和某种 queuing/tracking 在 cron 作业 运行 之前创建的文档以将它们推送到另一个集合。

一个简单的方法是,您可以添加一个带有时间戳的 created 字段,然后在预定义的时间段(例如,每分钟一次)安排函数 运行 并执行特定代码对于 created >= NOW - 31 mins AND created <= NOW - 30 mins 的所有记录(伪代码)。如果你的时间精度要求不是很高,那应该适用于大多数情况。

如果这不符合您的需求,您可以添加云任务(Google 云产品)。详情在此good article.

中指定

您可以在 Firestore Create 事件上触发云功能,这将在 30 分钟后安排云任务。这将有排队和重试机制。

使用 Firestore 构建队列非常简单,非常适合您的 use-case。这个想法是将 tasks 写入带有截止日期的 queue 集合,然后在截止日期时进行处理。

这是一个例子。

  1. 每当您的集合的初始 onCreate 事件发生时,将包含以下数据的文档写入 tasks 集合:
    duedate: new Date() + 30 minutes
    type: 'yourjob'
    status: 'scheduled'
    data: '...' // <-- put whatever data here you need to know when processing the task

  1. 让工人定期接取可用的工作 - 例如每分钟取决于您的需要
// Define what happens on what task type
const workers: Workers = {
  yourjob: (data) => db.collection('xyz').add({ foo: data }),
}


// The following needs to be scheduled

export const checkQueue = functions.https.onRequest(async (req, res) => {
  // Consistent timestamp
  const now = admin.firestore.Timestamp.now();
  // Check which tasks are due
  const query = db.collection('tasks').where('duedate', '<=', new Date()).where('status', '==', 'scheduled');
  const tasks = await query.get();
  // Process tasks and mark it in queue as done
  tasks.forEach(snapshot => {
    const { type, data } = snapshot.data();
    console.info('Executing job for task ' + JSON.stringify(type) + ' with data ' + JSON.stringify(data));
    const job = workers[type](data)
      // Update task doc with status or error
      .then(() => snapshot.ref.update({ status: 'complete' }))
      .catch((err) => {
        console.error('Error when executing worker', err);
        return snapshot.ref.update({ status: 'error' });
      });

    jobs.push(job);
  });
  return Promise.all(jobs).then(() => {
    res.send('ok');
    return true;
  }).catch((onError) => {
    console.error('Error', onError);
  });
});

如果有任务到期,您有不同的选项来触发队列检查:

  • 如上例所示使用 http 可调用函数。这需要您定期执行对此函数的 http 调用,以便它执行并检查是否有任务要完成。根据您的需要,您可以从自己的服务器执行此操作或使用 cron-job.org 之类的服务来执行调用。 请注意,HTTP 可调用函数将公开可用,其他人也可以调用它。但是,如果您使检查代码幂等,这应该不是问题。
  • 使用内部使用 Cloud Scheduler 的 Firebase "internal" cron option。使用它可以直接触发队列检查:
    export scheduledFunctionCrontab =
    functions.pubsub.schedule('* * * * *').onRun((context) => {
        console.log('This will be run every minute!');
        // Include code from checkQueue here from above
    });

使用这样的队列还可以使您的系统更加健壮 - 如果两者之间出现问题,您不会丢失以某种方式仅存在于内存中但只要它们未标记为已处理的任务,fixed 工人会捡起它们并重新处理它们。这当然取决于你的实现。