kafka 客户端 Api 个问题
kafka Client Api questions
任何人都可以从下面帮助我 queries.I 我正在使用 kafka-clients-0.10.1.1(单节点单代理)
auto.create.topics.enable 的默认值为真。
1.I 正在使用
向主题发送消息
kafkaProdcuer<String,String> producer> producer...
producer.send(new ProducerRecord<String, String>("my- topic","message"));
producer.close();
消费:
kafkaConsumer<String,String> consumer....
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(200);
while(true){
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
问题是当我第一次 运行 消费者时,它没有得到值。我必须再次 运行 生产者和 运行 消费者才能获取值。有时我必须 运行 制片人 3 次。
为什么会这样?
2.) enable.auto.commit=false
如果 enable.auto.commit 属性 为假,同一个消费者是否可以多次读取消息?
3.) 考虑到我在第一个 point.How 中的消费者代码,我可以打破循环我的意思是消费者如何知道它已经阅读了所有消息然后调用 consumer.close()
1) 您是否总是在消费者中使用相同的 group.id?你是先生产再消费吗?这可能与消费群体和抵消管理有关。请参阅 。
2) 不确定您的意思是有意还是无意地重复阅读。只要消息未因主题保留政策而被删除,您始终可以再次阅读同一消息以寻找该位置。如果您的意思是不小心,自动提交设置为 false 仅意味着消费者不会为您提交偏移量,您必须手动调用 commitSync() 或 commitAsync() 来完成。在任何情况下,您的消费者仍有可能在提交之前处理消息并崩溃,在这种情况下,当消费者恢复时,它将再次读取那些已处理但未提交的消息。如果你想要 exactly once 语义,你必须做其他事情,比如用处理过的消息自动存储偏移量。
3) 正如Lhfcws提到的,在流中没有像"all messages"这样的概念。您可以做的一些事情(技巧)是:
- 您可以检查 poll 返回的记录列表是否为空,并且在经过一些配置的次数后中断循环并退出。
- 如果消息是有序的(您正在从单个分区读取),您可以发送一种特殊的 END_OF_DATA 消息,当您看到它时,您将关闭消费者。
- 你可以让消费者读取一些消息然后退出,下次它会从上次提交的偏移量继续。
任何人都可以从下面帮助我 queries.I 我正在使用 kafka-clients-0.10.1.1(单节点单代理)
auto.create.topics.enable 的默认值为真。
1.I 正在使用
向主题发送消息 kafkaProdcuer<String,String> producer> producer...
producer.send(new ProducerRecord<String, String>("my- topic","message"));
producer.close();
消费:
kafkaConsumer<String,String> consumer....
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(200);
while(true){
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
问题是当我第一次 运行 消费者时,它没有得到值。我必须再次 运行 生产者和 运行 消费者才能获取值。有时我必须 运行 制片人 3 次。 为什么会这样?
2.) enable.auto.commit=false
如果 enable.auto.commit 属性 为假,同一个消费者是否可以多次读取消息?
3.) 考虑到我在第一个 point.How 中的消费者代码,我可以打破循环我的意思是消费者如何知道它已经阅读了所有消息然后调用 consumer.close()
1) 您是否总是在消费者中使用相同的 group.id?你是先生产再消费吗?这可能与消费群体和抵消管理有关。请参阅
2) 不确定您的意思是有意还是无意地重复阅读。只要消息未因主题保留政策而被删除,您始终可以再次阅读同一消息以寻找该位置。如果您的意思是不小心,自动提交设置为 false 仅意味着消费者不会为您提交偏移量,您必须手动调用 commitSync() 或 commitAsync() 来完成。在任何情况下,您的消费者仍有可能在提交之前处理消息并崩溃,在这种情况下,当消费者恢复时,它将再次读取那些已处理但未提交的消息。如果你想要 exactly once 语义,你必须做其他事情,比如用处理过的消息自动存储偏移量。
3) 正如Lhfcws提到的,在流中没有像"all messages"这样的概念。您可以做的一些事情(技巧)是:
- 您可以检查 poll 返回的记录列表是否为空,并且在经过一些配置的次数后中断循环并退出。
- 如果消息是有序的(您正在从单个分区读取),您可以发送一种特殊的 END_OF_DATA 消息,当您看到它时,您将关闭消费者。
- 你可以让消费者读取一些消息然后退出,下次它会从上次提交的偏移量继续。