Kafka Consumer - 轮询行为
Kafka Consumer - Poll behaviour
我在尝试针对 KafkaConsumer (>=0.9) 实施解决方案时遇到了一些严重的问题。
让我们假设我有一个函数,它只需要读取来自 kafka 主题的 n 条消息。
例如:getMsgs(5)
--> 获取主题中接下来的 5 条 kafka 消息。
所以,我有一个看起来像这样的循环。使用实际正确的参数进行编辑。在这种情况下,消费者的 max.poll.records
参数被设置为 1,因此实际循环只迭代了一次。不同的消费者(其中一些通过许多消息进行迭代)共享一个抽象的父亲(这个),这就是它以这种方式编码的原因。 numMss
部分是针对该消费者的临时设置。
for (boolean exit= false;!exit;)
{
Records = consumer.poll(config.pollTime);
for (Record r:records)
{
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
{
exit=true;
break;
}
}
}
考虑到这一点,问题是 poll() 方法可以获得超过 5 条消息。例如,如果它收到 10 条消息,我的代码将永远忘记其他 5 条消息,因为 Kafka 会认为它们已经被消费了。
我尝试提交偏移量但似乎不起作用:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
即使使用偏移配置,每当我再次启动消费者时,它不会从第 6 条消息开始(记住,我只想要 5 条消息),但是从 11th 开始(因为第一次轮询消耗了 10 条消息)。
是否有任何解决方案,或者也许(最肯定)我遗漏了什么?
提前致谢!!
设置auto.offset.reset属性为"latest"。然后尝试消费,你会从提交的偏移量中获取消费记录。
或者您在轮询之前使用 consumer.seek(TopicPartition, offset) api。
您是否通过将 enable.auto.commit 设置为 false 来禁用自动提交。如果您想手动提交偏移量,则需要禁用它。如果没有,下一次调用 poll() 将自动提交您从上一次 poll() 收到的消息的最新偏移量。
从 Kafka 0.9 开始,auto.offset.reset 参数名称已更改;
当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除)怎么办:
earliest: automatically reset the offset to the earliest offset
latest: automatically reset the offset to the latest offset
none: throw exception to the consumer if no previous offset is found for the consumer's group
anything else: throw exception to the consumer.
您可以将 max.poll.records
设置为任何您喜欢的数字,这样您最多可以在每次投票中获得那么多条记录。
对于您在此问题中陈述的用例,您不必自己明确提交偏移量。您可以将 enable.auto.commit
设置为 true
并将 auto.offset.reset
设置为 earliest
这样当没有消费者 group.id
时它就会启动(换句话说,当您关于第一次从分区开始读取)。一旦你有一个 group.id 和一些存储在 Kafka 中的消费者偏移量,如果你的 Kafka 消费者进程死亡,它将从最后提交的偏移量继续,因为这是默认行为,因为当消费者启动时,它会首先寻找是否有是任何已提交的偏移量,如果是,将从上次提交的偏移量继续,并且 auto.offset.reset
不会 开始。
我在尝试针对 KafkaConsumer (>=0.9) 实施解决方案时遇到了一些严重的问题。
让我们假设我有一个函数,它只需要读取来自 kafka 主题的 n 条消息。
例如:getMsgs(5)
--> 获取主题中接下来的 5 条 kafka 消息。
所以,我有一个看起来像这样的循环。使用实际正确的参数进行编辑。在这种情况下,消费者的 max.poll.records
参数被设置为 1,因此实际循环只迭代了一次。不同的消费者(其中一些通过许多消息进行迭代)共享一个抽象的父亲(这个),这就是它以这种方式编码的原因。 numMss
部分是针对该消费者的临时设置。
for (boolean exit= false;!exit;)
{
Records = consumer.poll(config.pollTime);
for (Record r:records)
{
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
{
exit=true;
break;
}
}
}
考虑到这一点,问题是 poll() 方法可以获得超过 5 条消息。例如,如果它收到 10 条消息,我的代码将永远忘记其他 5 条消息,因为 Kafka 会认为它们已经被消费了。
我尝试提交偏移量但似乎不起作用:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
即使使用偏移配置,每当我再次启动消费者时,它不会从第 6 条消息开始(记住,我只想要 5 条消息),但是从 11th 开始(因为第一次轮询消耗了 10 条消息)。
是否有任何解决方案,或者也许(最肯定)我遗漏了什么?
提前致谢!!
设置auto.offset.reset属性为"latest"。然后尝试消费,你会从提交的偏移量中获取消费记录。
或者您在轮询之前使用 consumer.seek(TopicPartition, offset) api。
您是否通过将 enable.auto.commit 设置为 false 来禁用自动提交。如果您想手动提交偏移量,则需要禁用它。如果没有,下一次调用 poll() 将自动提交您从上一次 poll() 收到的消息的最新偏移量。
从 Kafka 0.9 开始,auto.offset.reset 参数名称已更改;
当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除)怎么办:
earliest: automatically reset the offset to the earliest offset
latest: automatically reset the offset to the latest offset
none: throw exception to the consumer if no previous offset is found for the consumer's group
anything else: throw exception to the consumer.
您可以将 max.poll.records
设置为任何您喜欢的数字,这样您最多可以在每次投票中获得那么多条记录。
对于您在此问题中陈述的用例,您不必自己明确提交偏移量。您可以将 enable.auto.commit
设置为 true
并将 auto.offset.reset
设置为 earliest
这样当没有消费者 group.id
时它就会启动(换句话说,当您关于第一次从分区开始读取)。一旦你有一个 group.id 和一些存储在 Kafka 中的消费者偏移量,如果你的 Kafka 消费者进程死亡,它将从最后提交的偏移量继续,因为这是默认行为,因为当消费者启动时,它会首先寻找是否有是任何已提交的偏移量,如果是,将从上次提交的偏移量继续,并且 auto.offset.reset
不会 开始。