在 Node JS 应用程序中使用 Kafka Consumer 来指示计算已经完成

using Kafka Consumer in Node JS app to indicate computations have been made

所以我的问题可能涉及一些基于应用程序性质的头脑风暴。

我有一个向 Kafka 发送消息的 Node JS 应用程序。例如,每次用户点击一个页面时,Kafka 应用程序都会根据这次访问运行一次计算。然后我在同一个实例中想要在通过我的 Kafka 消息触发它后检索它。到目前为止,此计算存储在 Cassandra 数据库中。问题是,如果我们在计算完成之前尝试从 Cassandra 读取数据,那么我们将不会从数据库中查询任何内容(尚未插入密钥)并且不会 return 任何内容(错误),或者可能是计算过时了。到目前为止,这是我的代码。

router.get('/:slug', async (req, res) =>{

Producer = kafka.Producer


KeyedMessage = kafka.KeyedMessage
  client = new kafka.KafkaClient()



producer = new Producer(client)



km = new KeyedMessage('key', 'message')
  kafka_message = JSON.stringify({ id: req.session.session_id.toString(), url: arbitrary_url })
  payloads = [
    { topic: 'MakeComputationTopic', messages: kafka_message}
  ]; 
const clientCass = new cassandra.Client({
contactPoints: ['127.0.0.1:9042'],
localDataCenter: 'datacenter1', // here is the change required
keyspace: 'computation_space',
authProvider: new auth.PlainTextAuthProvider('cassandra', 'cassandra')
});



const query = 'SELECT * FROM computation  WHERE id = ?';




clientCass.execute(query, [req.session.session_id],{ hints : ['int'] })
  .then(result => console.log('User with email %s', result.rows[0].computations))
  .catch((message) => {
    console.log('Could not find key')
  });


}

首先,我想到了 async 和 await,但被排除了,因为这不会停止过时的计算。

其次,我考虑让我的应用程序休眠,但似乎这种方式会降低我的应用程序速度。

我可能决定使用 Kafka Consumer(在我的 node-js 中)来消费一条消息,表明现在可以安全地查看 Cassandra table。

例如(使用卡夫卡节点)

consumer.on('message', function (message) {
    clientCass.execute(query, [req.session.session_id],{ hints : ['int'] })
  .then(result => console.log('User with computation%s', result.rows[0].computations))
  .catch((message) => {
    console.log('Could not find key')
  });
}); 

这种方法虽然更好,但似乎有点不对劲,因为每次用户单击页面时我都必须创建一个消费者,而且我只关心它被发送了 1 条消息。

我想知道我应该如何应对这个挑战?我可能错过了一个场景,还是有办法使用 kafka-node 来解决这个问题?我也在考虑做一个 while 循环等待承诺成功并且计算不会过时(比较缓存中的值)

This approach while better seems a bit off since I will have to make a consumer every time a user clicks on a page, and I only care about it being sent 1 message.

我也会得出同样的结论。 Cassandra 不是为这些用例设计的。数据库最终是一致的。如果您将某些东西组合在一起,您当前的方法目前可能有效,但一旦您拥有 Cassandra 集群,肯定会导致未定义的行为。特别是当你更新条目时。

计算中的 id table 是您的分区键。这意味着一旦你有了一个集群,Cassandra 就会通过 id 分发数据。看起来它只包含一行。这是一种非常低效的 Cassandra 建模方法 tables.

您的用例看起来像是会话存储或缓存的用例。 Redis or LevelDB 非常适合这类用例。任何其他键值存储也可以完成这项工作。

为什么不将结果写入另一个主题,让另一个应用程序读取该主题并将结果写入数据库。这样你就不需要保持任何状态。完成后结果将在主题中。它看起来像这样:

传入数据 -> 第一个 kafka 主题 -> 计算应用程序 -> 第二个 kafka 主题 -> 另一个应用程序将其写入数据库 <- 另一个应用程序定期读取数据。

如果它在那里,它就在那里,因此还没有完成。