卡夫卡 0.9 消费者
Kafka 0.9 Consumer
如果我回到主题的开头,我可能有数百万条消息,我可能想要分批处理这些消息,而不是一次全部处理,并在每批之后提交偏移量。鉴于轮询似乎在当前偏移量之后获取所有内容并在轮询返回的末尾提交偏移量,我该怎么做?
您可以使用 max.partition.fetch.bytes
为来自每个分区的数据设置一个上限,唯一的缺点是记录只能那么大,所以如果您不知道记录可以有多大也许这不是最好的解决方案。
从 Kafka 返回的每条记录都有主题、分区和该分区上的偏移量,因此当您处理整个批次时(或者您可能想在处理完每条消息后执行此操作,这样如果您的消费者出现故障,您就不会'处理消息两次)您可以同步或异步提交偏移量。
如果我回到主题的开头,我可能有数百万条消息,我可能想要分批处理这些消息,而不是一次全部处理,并在每批之后提交偏移量。鉴于轮询似乎在当前偏移量之后获取所有内容并在轮询返回的末尾提交偏移量,我该怎么做?
您可以使用 max.partition.fetch.bytes
为来自每个分区的数据设置一个上限,唯一的缺点是记录只能那么大,所以如果您不知道记录可以有多大也许这不是最好的解决方案。
从 Kafka 返回的每条记录都有主题、分区和该分区上的偏移量,因此当您处理整个批次时(或者您可能想在处理完每条消息后执行此操作,这样如果您的消费者出现故障,您就不会'处理消息两次)您可以同步或异步提交偏移量。