如何使用 php-rdkafka 在 kafka 中确认消费消息?
How to acknowledge consume message in kafka using php-rdkafka?
我正在使用 php-rdkafka 作为 php kafka 客户端。我通过使用 test
成功生成了我的测试消息 group.and 使用以下代码使用消息,
$kafkaConsumer = new RdKafka\Consumer();
$kafkaConsumer->addBrokers("127.0.0.1:9292");
$topic = $kafkaConsumer->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$msg = $topic->consume(0, 1000);
if($msg){
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
}
}
但是当我再次尝试在 test
组中设置消息并尝试使用 test
组的消息时,我收到了旧消息和新消息。所以我只想知道如何确认旧消息以便我只能收到新消息而不是旧消息?有人可以对此进行一些改进吗?
我的kafka版本是0.11.0.1
Kafka中确认消费消息的方法是提交它的偏移量。这样,当重新启动您的消费者时,它可以检索最后提交的偏移量并在它停止的地方重新启动。
如评论中所建议,您需要使用RD_KAFKA_OFFSET_STORED
指示消费者检索存储的偏移量。
但您还需要通过设置 group.id
配置来提供组名:
<?php
$conf = new RdKafka\Conf();
// Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1:9292");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("test", $topicConf);
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>
我正在使用 php-rdkafka 作为 php kafka 客户端。我通过使用 test
成功生成了我的测试消息 group.and 使用以下代码使用消息,
$kafkaConsumer = new RdKafka\Consumer();
$kafkaConsumer->addBrokers("127.0.0.1:9292");
$topic = $kafkaConsumer->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$msg = $topic->consume(0, 1000);
if($msg){
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
}
}
但是当我再次尝试在 test
组中设置消息并尝试使用 test
组的消息时,我收到了旧消息和新消息。所以我只想知道如何确认旧消息以便我只能收到新消息而不是旧消息?有人可以对此进行一些改进吗?
我的kafka版本是0.11.0.1
Kafka中确认消费消息的方法是提交它的偏移量。这样,当重新启动您的消费者时,它可以检索最后提交的偏移量并在它停止的地方重新启动。
如评论中所建议,您需要使用RD_KAFKA_OFFSET_STORED
指示消费者检索存储的偏移量。
但您还需要通过设置 group.id
配置来提供组名:
<?php
$conf = new RdKafka\Conf();
// Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1:9292");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("test", $topicConf);
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>