依次执行在节点应用程序中接收到的 webhooks

Sequentially execute webhooks received in node application

我有一个使用 koa 的节点应用程序。它从特定资源上的外部应用程序接收 webhooks。

为了说明,让 webhook 向我发送 POST 请求这种类型的对象:

{
  'resource_id':'<SomeID>',
  'resource_origin':'<SomeResourceOrigin>',
  'value' : '<SomeValue>'
}

我想按顺序执行来自同一来源的任何资源,以避免与我的执行相关的资源不同步。

我正在考虑使用数据库作为锁并使用 cron 为同源的每个资源顺序执行我的进程。

但我不确定这是最有效的方法。

所以我的问题是:

您是否知道一些 method/package/service 允许我使用可以为每个来源实施的全局队列,以确保来自同一来源的资源将同步执行,而无需顺序处理所有 webhooks?如果不使用数据库就更好了

如果我是你,我会首先序列化所有 webhook 的处理。换句话说,我建议你一次处理一个,不管它们来自哪里。在您的 nodejs 应用程序中使用一个简单的队列。

(一旦你确信它工作正常,你就可以根据来源序列化它们。)

首先,构建您的函数(我们称它为 handleOneWebhook())以将传入的 webhook 处理为 Promise 或异步函数。然后您可以使用带有此大纲的代码调用它们。

let busy= false
async function handleManyWebhooks (queue) {
    if (busy) return
    busy = true
    while (queue.length > 0) {
        const item = queue.shift() 
        await handleOneWebhook (item)
    }
    busy  = false
}

您传递给 handleManyWebhooksqueue 是一个简单的数组,其中每个元素都是来自 POST 请求的对象。您将它用作队列:push() 每个对象将其放入队列,shift() 将其删除。

然后,每当您收到一个 webhook POST 对象时,您就会使用带有此大纲的代码。

const queue = []
...

function handlePostObject (postObject) {
   queue.push(postObject)
   handleManyWebooks (queue)
}

即使您为每个传入对象调用 handleManyWebhooks 一次,busy 标志也确保它一次只处理一个对象。

请注意,这是一个非常简单的解决方案。一旦你让它正常工作,两个可能的改进就会出现。

  1. 使用比简单数组更有效的队列。 shift()不是很快

  2. 为每个单独的源创建一个具有自己的 busy 标志的单独队列对象。然后,您将能够并行处理来自不同来源的 webhook,同时仍然序列化来自每个来源的 webhook 流。

我决定使用的解决方案

post 讨论的小摘要

作为 Ivan Rubinson 让我知道我的问题只是 producer-consumer problem.

所以我最终选择了使用RabbitMQ because I have a huge amount of webhook to process. For peoples having a small amount of request to process and do not want use external tools 确实是解决问题的好方法

解决方案设计

我终于安装并配置了 RabbitMQ 服务器,然后我为 web-hooks 的每个来源创建了一个队列。

制作人

在生产者方面,当我收到 web-hook 数据时,我将一条消息发送到与我的 web-hook 来源相对应的队列,其中包含处理实际行 ID 所需的序列化信息数据库使消息尽可能轻。

消费者

在消费者方面,我为每个源队列创建一个消费者函数,并将获取策略设置为一个,以便在每个队列中一个一个地处理消息最后,我将通道策略设置为在发送下一个之前等待确认消息信息 。使用此配置,消费者会逐条处理消息并解决最初的问题。

实施

制作人

   async function create(){
        await   amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
            await conn.createChannel().then(async (ch)=>{
                global.channel_publisher=ch;
            });
        });
    }

    async function sendtask(queue,task){
        if(!global.channel_publisher){
            await create();
        }
        global.channel_publisher.assertQueue(queue).then((ok)=>{
            global.channel_publisher.sendToQueue(queue, Buffer.from(task));
        });
    }

我在收到 web-hook

的地方使用 sendtask(queue,task) 功能

消费者

   async function create(){
      await amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
         await conn.createChannel().then(async (ch)=>{
            ch.prefetch(1);
            global.channel_consumer=ch;
          });
       });
    }

   async function consumeTask(queue){
       if(!global.channel_consumer){
           await create();
       }

       global.channel_consumer.assertQueue(queue).then((ok)=>{
          global.channel_consumer.consume(queue,(message)=>{
               const args=message.content.toString().split(';');

                    await processWebhooks(args);
                    global.channel_consumer.ack(message);
           });
       });
   }

当我不得不处理 web-hooks 的新来源时,我使用 consumeTask(queue)。我还用它来初始化我的应用程序,其中包含数据库中所有已知的来源。