kafka-node - 将参数传递给异步函数 consumer.on();

kafka-node - pass arguments to async function consumer.on();

我正在为我的 node.js 服务器使用 kafka-node - 用于连接到 kafka 主题。在他们的API里面有一个回调函数:

consumer.on('message', function (message) {
     console.log(message);
});

用于在消息到达时使用它们。

我想知道是否以及如何向此回调函数添加参数,因为消息到达时我不是调用它的人。

更具体地说,我想发送预期的ID并将其与消费的消息ID进行比较:

consumer.on('message', function (message, id) {
    if (id == message.id) 
        console.log(message);
});

编辑: 这是一个较大代码的片段。 完整流程是:

  1. 用户从 node.js 服务器请求 http 请求。
  2. GET HTTP 请求在 node.js 服务器中触发。
  3. kafka 生产者向外部系统发送一些消息(具有唯一 ID)并等待响应。
  4. kafka 消费者收到响应。将收到的消息 ID 与预期的 ID 进行比较 - 如果相等 - return 消息作为 http 响应。

考虑可以并行接收多个 http get 请求。

您不能向回调添加参数。但我相信你可以在没有它的情况下实现你想要做的事情 - 只需引用闭包范围内的变量即可。

例如,如果您有:

var id = 1001;

consumer.on('message', function(message) {
  if (id == message.id) console.log(message);
});

我相信这会如你所愿。