重新连接时始终从 Kafka 检索最新消息

Retrieve always latest messages from Kafka on reconnection

我正在编写一段代码,需要每隔几毫秒从 Kafka 读取数百条消息。我正在使用 C++ 和 librdkafka。当我的程序停止然后重新启动时,它不需要恢复所有丢失的消息,因为它已停止,但它需要始终从发送的 最新 消息中读取。

据我所知,我可以通过 enable.auto.commitauto.offset.reset 来管理消费者补偿。但是,后者仅在没有提交的偏移量时才有用,而前者让我自己管理要存储的偏移量。

玩这两个值我发现如果我将 enable.auto.commit 设置为 false,而不提交任何偏移量,并且将 auto.offset.reset 设置为 latest 它似乎总是检索最新讯息;但是这个解决方案有多干净?

我担心的是,如果在两次消费者投票之间发送了 2 条消息,而我的消费者只接收最新的消息,或者如果没有发送消息,它会不断读取相同的消息。两者都是不受欢迎的行为。

还有一个想法是清除消费组偏移量或者向前看,但是librdkafka中的seek方法好像不能用,找不到管理消费组的方法..

如何使用 librdkafka 始终读取来自 Kafka 的最新消息?

最后我通过管理自己的重新平衡回调解决了问题。当有新的消费者加入或离开群组时,该回调将始终执行。

The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS.

因此,在重新平衡回调中,我迭代了 TopicPartitions,以便使用最新的偏移量将它们分配给消费者。代码片段是这样的:

class SeekEndRebalanceCb : public RdKafka::RebalanceCb {
  public:
  void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
    if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
      for (auto partition = partitions.begin(); partition != partitions.end(); partition++) {
        (*partition)->set_offset(RdKafka::Topic::OFFSET_END);
        consumer->assign(partitions);
      }
    } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
      consumer->unassign();
    } else {
      std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
    }
  }
};

为了使用那个回调,我将把它设置给消费者。

SeekEndRebalanceCb ex_rb_cb;
if (consumer->set("rebalance_cb", &ex_rb_cb, errstr) != RdKafka::Conf::CONF_OK) {
  std::cerr << errstr << std::endl;
  return false;
}

consumer->assign(partitions) 应该在循环结束后调用 for

class SeekEndRebalanceCb : public RdKafka::RebalanceCb {
  public:
  void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
    if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
       for (auto partition = partitions.begin(); partition != partitions.end(); partition++) 
           (*partition)->set_offset(RdKafka::Topic::OFFSET_END);
       consumer->assign(partitions);
    } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
      consumer->unassign();
    } else {
      std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
    }
  }
};