下一次轮询中未收到未提交的事件
Uncommitted event is not received in the next poll
我有一个消费者,max.poll.records 设置为 1,enable.auto.commit 设置为 false,用于手动偏移控制。但是,即使我没有调用 commitSync,后续轮询也会返回下一个事件。这是详细信息,我在一个主题上产生了 4 个事件,在消费者中我没有为第三个事件提交我正在跳过 commitSync,我期待在下一个轮询中返回第三个事件,但第四个事件已经返回。我很困惑 event 3 是如何提交的。
private static void pauseAndResume() {
int retryDelay = 5; // seconds
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
SimpleProducer.produce(4); //(produces Event1, Event2, Event3, Event4)
Properties properties = new Properties();
String topicName = "output-topic";
properties.put("bootstrap.servers", "localhost:29092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "test-group");
properties.put("max.poll.records", 1);
properties.put("enable.auto.commit", false);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<String>();
topics.add(topicName);
kafkaConsumer.subscribe(topics);
Collection<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
PartitionInfo partitionInfo = kafkaConsumer.partitionsFor(topicName).get(0);
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
int eventsCount = 0;
try {
Date pausedAt = new Date();
while (true) {;
if (!kafkaConsumer.paused().isEmpty()) {
if ((new Date().getTime() - pausedAt.getTime()) / 1000 % 60 >= retryDelay) {
System.out.println("Resuming Consumer..." + sdf.format(new Date()));
kafkaConsumer.resume(topicPartitions);
}
}
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(eventsCount + ":" + record.value());
if (record.value().equals("Event3")) {
System.out.println("consumer is pausing...... for about " + retryDelay + " seconds " + sdf.format(new Date()));
kafkaConsumer.pause(topicPartitions);
pausedAt = new Date();
break;
}else {
kafkaConsumer.commitSync();
}
}
}
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
kafkaConsumer.close();
}
}
link KafkaConsumer<K,V> 没有说明如何停止偏移前进):
我认为一些智能内部检测到 Event3 的不确定轮询并返回 Event4
根据我的研究(google 和 Kafka 论坛)我希望 Event3 重播,因为它没有提交,但它没有发生,请有人指出我正确的方向。
非常感谢
我想出了一个解决方法来明确搜索主题分区
//In this use case we are consuming from single topic which has only one partition
kafkaConsumer.seek(topicPartitions.iterator().next(), record.offset());
我有一个消费者,max.poll.records 设置为 1,enable.auto.commit 设置为 false,用于手动偏移控制。但是,即使我没有调用 commitSync,后续轮询也会返回下一个事件。这是详细信息,我在一个主题上产生了 4 个事件,在消费者中我没有为第三个事件提交我正在跳过 commitSync,我期待在下一个轮询中返回第三个事件,但第四个事件已经返回。我很困惑 event 3 是如何提交的。
private static void pauseAndResume() {
int retryDelay = 5; // seconds
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
SimpleProducer.produce(4); //(produces Event1, Event2, Event3, Event4)
Properties properties = new Properties();
String topicName = "output-topic";
properties.put("bootstrap.servers", "localhost:29092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "test-group");
properties.put("max.poll.records", 1);
properties.put("enable.auto.commit", false);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<String>();
topics.add(topicName);
kafkaConsumer.subscribe(topics);
Collection<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
PartitionInfo partitionInfo = kafkaConsumer.partitionsFor(topicName).get(0);
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
int eventsCount = 0;
try {
Date pausedAt = new Date();
while (true) {;
if (!kafkaConsumer.paused().isEmpty()) {
if ((new Date().getTime() - pausedAt.getTime()) / 1000 % 60 >= retryDelay) {
System.out.println("Resuming Consumer..." + sdf.format(new Date()));
kafkaConsumer.resume(topicPartitions);
}
}
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(eventsCount + ":" + record.value());
if (record.value().equals("Event3")) {
System.out.println("consumer is pausing...... for about " + retryDelay + " seconds " + sdf.format(new Date()));
kafkaConsumer.pause(topicPartitions);
pausedAt = new Date();
break;
}else {
kafkaConsumer.commitSync();
}
}
}
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
kafkaConsumer.close();
}
}
link KafkaConsumer<K,V> 没有说明如何停止偏移前进): 我认为一些智能内部检测到 Event3 的不确定轮询并返回 Event4 根据我的研究(google 和 Kafka 论坛)我希望 Event3 重播,因为它没有提交,但它没有发生,请有人指出我正确的方向。
非常感谢
我想出了一个解决方法来明确搜索主题分区
//In this use case we are consuming from single topic which has only one partition
kafkaConsumer.seek(topicPartitions.iterator().next(), record.offset());