重新连接时始终从 Kafka 检索最新消息
Retrieve always latest messages from Kafka on reconnection
我正在编写一段代码,需要每隔几毫秒从 Kafka 读取数百条消息。我正在使用 C++ 和 librdkafka。当我的程序停止然后重新启动时,它不需要恢复所有丢失的消息,因为它已停止,但它需要始终从发送的 最新 消息中读取。
据我所知,我可以通过 enable.auto.commit
和 auto.offset.reset
来管理消费者补偿。但是,后者仅在没有提交的偏移量时才有用,而前者让我自己管理要存储的偏移量。
玩这两个值我发现如果我将 enable.auto.commit
设置为 false
,而不提交任何偏移量,并且将 auto.offset.reset
设置为 latest
它似乎总是检索最新讯息;但是这个解决方案有多干净?
我担心的是,如果在两次消费者投票之间发送了 2 条消息,而我的消费者只接收最新的消息,或者如果没有发送消息,它会不断读取相同的消息。两者都是不受欢迎的行为。
还有一个想法是清除消费组偏移量或者向前看,但是librdkafka中的seek
方法好像不能用,找不到管理消费组的方法..
如何使用 librdkafka 始终读取来自 Kafka 的最新消息?
最后我通过管理自己的重新平衡回调解决了问题。当有新的消费者加入或离开群组时,该回调将始终执行。
The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS.
因此,在重新平衡回调中,我迭代了 TopicPartition
s,以便使用最新的偏移量将它们分配给消费者。代码片段是这样的:
class SeekEndRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
for (auto partition = partitions.begin(); partition != partitions.end(); partition++) {
(*partition)->set_offset(RdKafka::Topic::OFFSET_END);
consumer->assign(partitions);
}
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
consumer->unassign();
} else {
std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
}
}
};
为了使用那个回调,我将把它设置给消费者。
SeekEndRebalanceCb ex_rb_cb;
if (consumer->set("rebalance_cb", &ex_rb_cb, errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
return false;
}
consumer->assign(partitions) 应该在循环结束后调用 for
class SeekEndRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
for (auto partition = partitions.begin(); partition != partitions.end(); partition++)
(*partition)->set_offset(RdKafka::Topic::OFFSET_END);
consumer->assign(partitions);
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
consumer->unassign();
} else {
std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
}
}
};
我正在编写一段代码,需要每隔几毫秒从 Kafka 读取数百条消息。我正在使用 C++ 和 librdkafka。当我的程序停止然后重新启动时,它不需要恢复所有丢失的消息,因为它已停止,但它需要始终从发送的 最新 消息中读取。
据我所知,我可以通过 enable.auto.commit
和 auto.offset.reset
来管理消费者补偿。但是,后者仅在没有提交的偏移量时才有用,而前者让我自己管理要存储的偏移量。
玩这两个值我发现如果我将 enable.auto.commit
设置为 false
,而不提交任何偏移量,并且将 auto.offset.reset
设置为 latest
它似乎总是检索最新讯息;但是这个解决方案有多干净?
我担心的是,如果在两次消费者投票之间发送了 2 条消息,而我的消费者只接收最新的消息,或者如果没有发送消息,它会不断读取相同的消息。两者都是不受欢迎的行为。
还有一个想法是清除消费组偏移量或者向前看,但是librdkafka中的seek
方法好像不能用,找不到管理消费组的方法..
如何使用 librdkafka 始终读取来自 Kafka 的最新消息?
最后我通过管理自己的重新平衡回调解决了问题。当有新的消费者加入或离开群组时,该回调将始终执行。
The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS.
因此,在重新平衡回调中,我迭代了 TopicPartition
s,以便使用最新的偏移量将它们分配给消费者。代码片段是这样的:
class SeekEndRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
for (auto partition = partitions.begin(); partition != partitions.end(); partition++) {
(*partition)->set_offset(RdKafka::Topic::OFFSET_END);
consumer->assign(partitions);
}
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
consumer->unassign();
} else {
std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
}
}
};
为了使用那个回调,我将把它设置给消费者。
SeekEndRebalanceCb ex_rb_cb;
if (consumer->set("rebalance_cb", &ex_rb_cb, errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
return false;
}
consumer->assign(partitions) 应该在循环结束后调用 for
class SeekEndRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
for (auto partition = partitions.begin(); partition != partitions.end(); partition++)
(*partition)->set_offset(RdKafka::Topic::OFFSET_END);
consumer->assign(partitions);
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
consumer->unassign();
} else {
std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
}
}
};