Node.js EventEmitter 事件不共享事件循环
Node.js EventEmitter events not sharing event loop
也许潜在的问题是我正在使用的 node-kafka 模块是如何实现的,但也许不是,所以我们开始吧...
使用 node-kafa 库时,我遇到了订阅 consumer.on('message')
事件的问题。该库使用标准 events
模块,所以我认为这个问题可能足够通用。
我的实际代码结构又大又复杂,所以这里有一个基本布局的伪例子来突出我的问题。 (注意:此代码片段未经测试,所以我这里可能有错误,但无论如何这里的语法都没有问题)
var messageCount = 0;
var queryCount = 0;
// Getting messages via some event Emitter
consumer.on('message', function(message) {
message++;
console.log('Message #' + message);
// Making a database call for each message
mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) {
queryCount++;
console.log('Query #' + queryCount);
});
})
我在这里看到的是,当我启动我的服务器时,有 100,000 条左右积压的消息是 kafka 想要给我的,它通过事件发射器这样做。所以我开始收到消息。获取并记录所有消息大约需要 15 秒。
假设 mysql 查询相当快,这是我希望看到的输出结果:
Message #1
Message #2
Message #3
...
Message #500
Query #1
Message #501
Message #502
Query #2
... and so on in some intermingled fashion
我希望这是因为我的第一个 mysql 结果应该很快准备好,我希望结果在事件循环中轮流处理响应。我实际得到的是:
Message #1
Message #2
...
Message #100000
Query #1
Query #2
...
Query #100000
在能够处理 mysql 响应之前,我收到了每条消息。所以我的问题是,为什么?为什么在所有消息事件完成之前我无法获得单个数据库结果?
另一个注意事项:我在 node-kafka 中的 .emit('message')
和我的代码中的 mysql.query()
处设置了一个断点,我正在按回合制击中它们。因此,在进入我的事件订阅者之前,所有 100,000 次发射似乎都没有预先堆叠起来。所以我对这个问题的第一个假设就出现了。
想法和知识将不胜感激:)
node-kafka
驱动程序使用相当宽松的缓冲区大小 (1M),这意味着它将从 Kafka 获取适合缓冲区的尽可能多的消息。如果服务器积压,并且根据消息大小,这可能意味着一个请求会收到(数万)条消息。
因为 EventEmitter 是同步的(它不使用 Node 事件循环),这意味着驱动程序将向其侦听器发出(数万个)数千个事件,并且由于它是同步的,它不会屈服于节点事件循环,直到所有消息都已传递。
我不认为您可以解决大量的事件传递问题,但我不认为具体的事件传递有问题。更可能的问题是为每个事件启动异步操作(在本例中为 MySQL 查询),这可能会使数据库充满查询。
一种可能的解决方法是使用队列而不是直接从事件处理程序执行查询。例如,使用 async.queue
可以限制并发(异步)任务的数量。队列的 "worker" 部分将执行 MySQL 查询,在事件处理程序中,您只需将消息推送到队列中。
也许潜在的问题是我正在使用的 node-kafka 模块是如何实现的,但也许不是,所以我们开始吧...
使用 node-kafa 库时,我遇到了订阅 consumer.on('message')
事件的问题。该库使用标准 events
模块,所以我认为这个问题可能足够通用。
我的实际代码结构又大又复杂,所以这里有一个基本布局的伪例子来突出我的问题。 (注意:此代码片段未经测试,所以我这里可能有错误,但无论如何这里的语法都没有问题)
var messageCount = 0;
var queryCount = 0;
// Getting messages via some event Emitter
consumer.on('message', function(message) {
message++;
console.log('Message #' + message);
// Making a database call for each message
mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) {
queryCount++;
console.log('Query #' + queryCount);
});
})
我在这里看到的是,当我启动我的服务器时,有 100,000 条左右积压的消息是 kafka 想要给我的,它通过事件发射器这样做。所以我开始收到消息。获取并记录所有消息大约需要 15 秒。
假设 mysql 查询相当快,这是我希望看到的输出结果:
Message #1
Message #2
Message #3
...
Message #500
Query #1
Message #501
Message #502
Query #2
... and so on in some intermingled fashion
我希望这是因为我的第一个 mysql 结果应该很快准备好,我希望结果在事件循环中轮流处理响应。我实际得到的是:
Message #1
Message #2
...
Message #100000
Query #1
Query #2
...
Query #100000
在能够处理 mysql 响应之前,我收到了每条消息。所以我的问题是,为什么?为什么在所有消息事件完成之前我无法获得单个数据库结果?
另一个注意事项:我在 node-kafka 中的 .emit('message')
和我的代码中的 mysql.query()
处设置了一个断点,我正在按回合制击中它们。因此,在进入我的事件订阅者之前,所有 100,000 次发射似乎都没有预先堆叠起来。所以我对这个问题的第一个假设就出现了。
想法和知识将不胜感激:)
node-kafka
驱动程序使用相当宽松的缓冲区大小 (1M),这意味着它将从 Kafka 获取适合缓冲区的尽可能多的消息。如果服务器积压,并且根据消息大小,这可能意味着一个请求会收到(数万)条消息。
因为 EventEmitter 是同步的(它不使用 Node 事件循环),这意味着驱动程序将向其侦听器发出(数万个)数千个事件,并且由于它是同步的,它不会屈服于节点事件循环,直到所有消息都已传递。
我不认为您可以解决大量的事件传递问题,但我不认为具体的事件传递有问题。更可能的问题是为每个事件启动异步操作(在本例中为 MySQL 查询),这可能会使数据库充满查询。
一种可能的解决方法是使用队列而不是直接从事件处理程序执行查询。例如,使用 async.queue
可以限制并发(异步)任务的数量。队列的 "worker" 部分将执行 MySQL 查询,在事件处理程序中,您只需将消息推送到队列中。