关于在 node.js 中设计 Kafka 消费者并使其幂等的问题

Question on designing a Kafka Consumer in node.js and making it idempotent

我们正在尝试编写一个消费者,它会订阅一个特定的主题,并在满足数据中的几个条件时进行一些处理。然而,处理中的一个操作只能执行一次,为了简单起见,我们将其视为一个非幂等的 POST http 请求。

以下是一些其他注意事项:-

我们认为要使这个消费者幂等,我们也许可以做这样的事情:-

For Every Message
    Check if message was processed
    try:
        If !processed
            Do Processing (http POST included here)
    catch for errors:
            Do error processing
    finally:
        Mark message as processed

“将消息标记为已处理”基本上是将一些细节捕获到关系数据库(如 Postgres)、偏移量、时间戳和其他一些细节,以确保我们捕获了允许我们识别的键独一无二的记录

  1. 以上看起来是否适合使消费者幂等?
  2. 您还可以建议哪些其他替代方案性能更好?

除了上面的内容之外,还有一些关于上述场景中数据库处理最佳实践的问题:-

  1. 假设我有 3 个 k8s 节点,每个节点有 3 个消费者 pods 运行ning,基本上给了 9 个单线程 kafka 消费者。这样理解对吗?

  2. 既然每个线程都将执行 DB Inserts/Reads,那么在 Pool 和 Client 之间使用什么更好(假设 node-postgres 库)?

  3. 看来,如果我们在一天开始时打开一个客户端连接并让它一直存在到一天结束,它应该对我们有用。这是一个好方法还是一个非常糟糕的设计?

  4. 如果我们对来自这 9 个消费者的每条消息进行处理,我们是否通过使用池获得任何好处 运行ning。

附加假设:-

感谢您的耐心阅读。对于这个 post.

的长度,我们深表歉意
  1. 以上看起来是否适合使消费者幂等?

Yes, from the POV of Idempotency, your code looks good. Since you're working with Kafka consumers, you don't need an exclusive for loop for message processing. Consumers are invoked on each message arrival. Your psuedo-code should look like this:

Check if message was processed
try:
    If !processed
        Do Processing (http POST included here)
catch for errors:
        Do error processing
finally:
    Mark message as processed
  1. 您还可以建议哪些其他替代方案性能更好?

Your code misses out on an important aspect which is concurrent duplicate messages. Say, for example, two messages are somehow produced at the same time from the producer (which is actually an error at producer's end) and the message should be processed only once. The consumer starts processing for both messages. At this point If !processed, both consumers see the same state which is not processed and both of them proceed to Do Processing. You can avoid such scenarios by acquiring a lock on some an id by which you can tell if a message is duplicate or not. Since you're already using Postgres, you could look into pg_advisory_locks. So now, your pseudo-code will now look something like:

Check if message was processed
try:
    acquire_lock(message.uniqId)    //pg_advisory_lock
    If !processed
        Do Processing (http POST included here)
catch for errors:
    if error is lock_already_acquired_by_other_process
        return as duplicate processor
    else
        Do error processing
finally:
    Mark message as processed
    release lock

We can still do some improvements. The above code doesn't handle failure scenarios where we would like to have retries. There are various ways to achieve this. Oh wait, you're using Kafka. Why not publish the message which has failed in processing (obviously not those which were duplicates) in the same Kafka topic after some delay and have some counter in the message object to check how many times has this message been processed. We would certainly want to retry only a limited number of times, so each time we're processing the message we can check the counter we set previously in the message object to control the number of retries. So far so good, but what about the messages which are failing even after the fixed number of retries. For such cases, you'd like to have a DLQ (dead lettered queue) which holds such messages along with some error message until you have looked at them manually and fixed the errors.

This sounds a lot of code to write. We've another great news. There are libraries available which you can leverage to achieve all these. One such library is bull.

  1. 假设我有 3 个 k8s 节点,每个节点有 3 个消费者 pods 运行,基本上提供了 9 个单线程 kafka 消费者。这样理解对吗?

Yeah. As far as I understand this.

  1. 既然每个线程都将执行 DB Inserts/Reads,那么在 Pool 和 Client 之间使用什么更好(假设是 node-postgres 库)?

Using a pool would be advisable since you'd also aim to achieve faster processing. With connection pooling you can do stuffs like firing multiple queries at the same time without queuing them up, utilizing any underlying library which uses parallel execution, etc. Of course, we shouldn't fill up our memory with connections so a tuned number of connections in the pool according to the pod's memory is advisable.

  1. 看来,如果我们在一天开始时打开一个客户端连接并让它一直存在到一天结束,它应该对我们有用。这是一个好方法还是一个非常糟糕的设计?

I can't understand correctly what you're trying to do here but I'd for connection pooling.

  1. 如果我们对来自这 9 个消费者的每条消息进行处理,我们是否通过使用池获得任何好处 运行。

Yes. Apart from the benefits already mentioned in point 4, you get better resource utilization of your k8s pods (again that depends on whether if 9 consumers are optimal according to the message incoming rate).