依次执行在节点应用程序中接收到的 webhooks
Sequentially execute webhooks received in node application
我有一个使用 koa 的节点应用程序。它从特定资源上的外部应用程序接收 webhooks。
为了说明,让 webhook 向我发送 POST 请求这种类型的对象:
{
'resource_id':'<SomeID>',
'resource_origin':'<SomeResourceOrigin>',
'value' : '<SomeValue>'
}
我想按顺序执行来自同一来源的任何资源,以避免与我的执行相关的资源不同步。
我正在考虑使用数据库作为锁并使用 cron 为同源的每个资源顺序执行我的进程。
但我不确定这是最有效的方法。
所以我的问题是:
您是否知道一些 method/package/service 允许我使用可以为每个来源实施的全局队列,以确保来自同一来源的资源将同步执行,而无需顺序处理所有 webhooks?如果不使用数据库就更好了
如果我是你,我会首先序列化所有 webhook 的处理。换句话说,我建议你一次处理一个,不管它们来自哪里。在您的 nodejs 应用程序中使用一个简单的队列。
(一旦你确信它工作正常,你就可以根据来源序列化它们。)
首先,构建您的函数(我们称它为 handleOneWebhook()
)以将传入的 webhook 处理为 Promise 或异步函数。然后您可以使用带有此大纲的代码调用它们。
let busy= false
async function handleManyWebhooks (queue) {
if (busy) return
busy = true
while (queue.length > 0) {
const item = queue.shift()
await handleOneWebhook (item)
}
busy = false
}
您传递给 handleManyWebhooks
的 queue
是一个简单的数组,其中每个元素都是来自 POST 请求的对象。您将它用作队列:push()
每个对象将其放入队列,shift()
将其删除。
然后,每当您收到一个 webhook POST 对象时,您就会使用带有此大纲的代码。
const queue = []
...
function handlePostObject (postObject) {
queue.push(postObject)
handleManyWebooks (queue)
}
即使您为每个传入对象调用 handleManyWebhooks 一次,busy
标志也确保它一次只处理一个对象。
请注意,这是一个非常简单的解决方案。一旦你让它正常工作,两个可能的改进就会出现。
使用比简单数组更有效的队列。 shift()
不是很快
为每个单独的源创建一个具有自己的 busy
标志的单独队列对象。然后,您将能够并行处理来自不同来源的 webhook,同时仍然序列化来自每个来源的 webhook 流。
我决定使用的解决方案
post 讨论的小摘要
作为 Ivan Rubinson 让我知道我的问题只是 producer-consumer problem.
所以我最终选择了使用RabbitMQ because I have a huge amount of webhook to process. For peoples having a small amount of request to process and do not want use external tools 确实是解决问题的好方法
解决方案设计
我终于安装并配置了 RabbitMQ 服务器,然后我为 web-hooks 的每个来源创建了一个队列。
制作人
在生产者方面,当我收到 web-hook 数据时,我将一条消息发送到与我的 web-hook 来源相对应的队列,其中包含处理实际行 ID 所需的序列化信息数据库使消息尽可能轻。
消费者
在消费者方面,我为每个源队列创建一个消费者函数,并将获取策略设置为一个,以便在每个队列中一个一个地处理消息最后,我将通道策略设置为在发送下一个之前等待确认消息信息 。使用此配置,消费者会逐条处理消息并解决最初的问题。
实施
制作人
async function create(){
await amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
await conn.createChannel().then(async (ch)=>{
global.channel_publisher=ch;
});
});
}
async function sendtask(queue,task){
if(!global.channel_publisher){
await create();
}
global.channel_publisher.assertQueue(queue).then((ok)=>{
global.channel_publisher.sendToQueue(queue, Buffer.from(task));
});
}
我在收到 web-hook
的地方使用 sendtask(queue,task)
功能
消费者
async function create(){
await amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
await conn.createChannel().then(async (ch)=>{
ch.prefetch(1);
global.channel_consumer=ch;
});
});
}
async function consumeTask(queue){
if(!global.channel_consumer){
await create();
}
global.channel_consumer.assertQueue(queue).then((ok)=>{
global.channel_consumer.consume(queue,(message)=>{
const args=message.content.toString().split(';');
await processWebhooks(args);
global.channel_consumer.ack(message);
});
});
}
当我不得不处理 web-hooks 的新来源时,我使用 consumeTask(queue)
。我还用它来初始化我的应用程序,其中包含数据库中所有已知的来源。
我有一个使用 koa 的节点应用程序。它从特定资源上的外部应用程序接收 webhooks。
为了说明,让 webhook 向我发送 POST 请求这种类型的对象:
{
'resource_id':'<SomeID>',
'resource_origin':'<SomeResourceOrigin>',
'value' : '<SomeValue>'
}
我想按顺序执行来自同一来源的任何资源,以避免与我的执行相关的资源不同步。
我正在考虑使用数据库作为锁并使用 cron 为同源的每个资源顺序执行我的进程。
但我不确定这是最有效的方法。
所以我的问题是:
您是否知道一些 method/package/service 允许我使用可以为每个来源实施的全局队列,以确保来自同一来源的资源将同步执行,而无需顺序处理所有 webhooks?如果不使用数据库就更好了
如果我是你,我会首先序列化所有 webhook 的处理。换句话说,我建议你一次处理一个,不管它们来自哪里。在您的 nodejs 应用程序中使用一个简单的队列。
(一旦你确信它工作正常,你就可以根据来源序列化它们。)
首先,构建您的函数(我们称它为 handleOneWebhook()
)以将传入的 webhook 处理为 Promise 或异步函数。然后您可以使用带有此大纲的代码调用它们。
let busy= false
async function handleManyWebhooks (queue) {
if (busy) return
busy = true
while (queue.length > 0) {
const item = queue.shift()
await handleOneWebhook (item)
}
busy = false
}
您传递给 handleManyWebhooks
的 queue
是一个简单的数组,其中每个元素都是来自 POST 请求的对象。您将它用作队列:push()
每个对象将其放入队列,shift()
将其删除。
然后,每当您收到一个 webhook POST 对象时,您就会使用带有此大纲的代码。
const queue = []
...
function handlePostObject (postObject) {
queue.push(postObject)
handleManyWebooks (queue)
}
即使您为每个传入对象调用 handleManyWebhooks 一次,busy
标志也确保它一次只处理一个对象。
请注意,这是一个非常简单的解决方案。一旦你让它正常工作,两个可能的改进就会出现。
使用比简单数组更有效的队列。
shift()
不是很快为每个单独的源创建一个具有自己的
busy
标志的单独队列对象。然后,您将能够并行处理来自不同来源的 webhook,同时仍然序列化来自每个来源的 webhook 流。
我决定使用的解决方案
post 讨论的小摘要
作为 Ivan Rubinson 让我知道我的问题只是 producer-consumer problem.
所以我最终选择了使用RabbitMQ because I have a huge amount of webhook to process. For peoples having a small amount of request to process and do not want use external tools
解决方案设计
我终于安装并配置了 RabbitMQ 服务器,然后我为 web-hooks 的每个来源创建了一个队列。
制作人
在生产者方面,当我收到 web-hook 数据时,我将一条消息发送到与我的 web-hook 来源相对应的队列,其中包含处理实际行 ID 所需的序列化信息数据库使消息尽可能轻。
消费者
在消费者方面,我为每个源队列创建一个消费者函数,并将获取策略设置为一个,以便在每个队列中一个一个地处理消息最后,我将通道策略设置为在发送下一个之前等待确认消息信息 。使用此配置,消费者会逐条处理消息并解决最初的问题。
实施
制作人
async function create(){
await amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
await conn.createChannel().then(async (ch)=>{
global.channel_publisher=ch;
});
});
}
async function sendtask(queue,task){
if(!global.channel_publisher){
await create();
}
global.channel_publisher.assertQueue(queue).then((ok)=>{
global.channel_publisher.sendToQueue(queue, Buffer.from(task));
});
}
我在收到 web-hook
的地方使用sendtask(queue,task)
功能
消费者
async function create(){
await amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
await conn.createChannel().then(async (ch)=>{
ch.prefetch(1);
global.channel_consumer=ch;
});
});
}
async function consumeTask(queue){
if(!global.channel_consumer){
await create();
}
global.channel_consumer.assertQueue(queue).then((ok)=>{
global.channel_consumer.consume(queue,(message)=>{
const args=message.content.toString().split(';');
await processWebhooks(args);
global.channel_consumer.ack(message);
});
});
}
当我不得不处理 web-hooks 的新来源时,我使用 consumeTask(queue)
。我还用它来初始化我的应用程序,其中包含数据库中所有已知的来源。