Kafka 在 Consumer.Poll(Duration.ZERO) 之后没有分配分区;
Kafka is not assigning a partition after Consumer.Poll(Duration.ZERO);
我开始了一个项目,我在其中实现了 appache kafka。
我已经有一个工作的生产者将数据写入队列。到目前为止,一切都很好。现在我想编写一个读取队列中所有数据的消费者。
即对应代码:
try {
consumer.subscribe(Collections.singletonList("names"));
if (startingPoint != null){
consumer.
consumer.poll(Duration.ofMillis(0));
consumer.seekToBeginning(consumer.assignment());
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
keyValuePairs.add(new String[]{record.key(),record.value()});
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
该代码现在无法正常工作。仅消耗新记录。
我能够发现
seekToBeginning() 不起作用,因为那一刻没有分区分配给消费者。
如果我增加投票的持续时间,它就会起作用。另一方面,如果我只是暂停线程,它不会。
有人可以解释一下为什么会这样吗?我试图自己找出答案,并且已经阅读了一些有关 Kafka 心跳的内容。但是我还没有完全理解到底发生了什么。
作业需要时间;轮询 0 通常意味着轮询将在它发生之前退出。
您应该向 subscribe() 方法添加一个 ConsumerRebalanceListener
回调并在 onPartitionsAssigned()
中执行查找。
编辑
@SpringBootApplication
public class So69121558Application {
public static void main(String[] args) {
SpringApplication.run(So69121558Application.class, args);
}
@Bean
public ApplicationRunner runner(ConsumerFactory<String, String> cf, KafkaTemplate<String, String> template) {
return args -> {
template.send("so69121558", "test");
Consumer<String, String> consumer = cf.createConsumer("group", "");
consumer.subscribe(Collections.singletonList("so69121558"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
});
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
records.forEach(System.out::println);
Thread.sleep(5000);
consumer.close();
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
}
}
这里有几个以 Spring 方式进行操作的示例 - 只需将其中一个(或两个)添加到上面的 class.
@KafkaListener(id = "so69121558", topics = "so69121558")
void listen(ConsumerRecord<?, ?> rec) {
System.out.println(rec);
}
@KafkaListener(id = "so69121558-1", topics = "so69121558")
void pojoListen(String in) {
System.out.println(in);
}
查找的方式也有点不同;这是完整的示例:
@SpringBootApplication
public class So69121558Application extends AbstractConsumerSeekAware {
public static void main(String[] args) {
SpringApplication.run(So69121558Application.class, args);
}
@KafkaListener(id = "so69121558", topics = "so69121558")
void listen(ConsumerRecord<?, ?> rec) {
System.out.println(rec);
}
@KafkaListener(id = "so69121558-1", topics = "so69121558")
void pojoListen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
我开始了一个项目,我在其中实现了 appache kafka。
我已经有一个工作的生产者将数据写入队列。到目前为止,一切都很好。现在我想编写一个读取队列中所有数据的消费者。
即对应代码:
try {
consumer.subscribe(Collections.singletonList("names"));
if (startingPoint != null){
consumer.
consumer.poll(Duration.ofMillis(0));
consumer.seekToBeginning(consumer.assignment());
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
keyValuePairs.add(new String[]{record.key(),record.value()});
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
该代码现在无法正常工作。仅消耗新记录。 我能够发现 seekToBeginning() 不起作用,因为那一刻没有分区分配给消费者。 如果我增加投票的持续时间,它就会起作用。另一方面,如果我只是暂停线程,它不会。
有人可以解释一下为什么会这样吗?我试图自己找出答案,并且已经阅读了一些有关 Kafka 心跳的内容。但是我还没有完全理解到底发生了什么。
作业需要时间;轮询 0 通常意味着轮询将在它发生之前退出。
您应该向 subscribe() 方法添加一个 ConsumerRebalanceListener
回调并在 onPartitionsAssigned()
中执行查找。
编辑
@SpringBootApplication
public class So69121558Application {
public static void main(String[] args) {
SpringApplication.run(So69121558Application.class, args);
}
@Bean
public ApplicationRunner runner(ConsumerFactory<String, String> cf, KafkaTemplate<String, String> template) {
return args -> {
template.send("so69121558", "test");
Consumer<String, String> consumer = cf.createConsumer("group", "");
consumer.subscribe(Collections.singletonList("so69121558"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
});
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
records.forEach(System.out::println);
Thread.sleep(5000);
consumer.close();
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
}
}
这里有几个以 Spring 方式进行操作的示例 - 只需将其中一个(或两个)添加到上面的 class.
@KafkaListener(id = "so69121558", topics = "so69121558")
void listen(ConsumerRecord<?, ?> rec) {
System.out.println(rec);
}
@KafkaListener(id = "so69121558-1", topics = "so69121558")
void pojoListen(String in) {
System.out.println(in);
}
查找的方式也有点不同;这是完整的示例:
@SpringBootApplication
public class So69121558Application extends AbstractConsumerSeekAware {
public static void main(String[] args) {
SpringApplication.run(So69121558Application.class, args);
}
@KafkaListener(id = "so69121558", topics = "so69121558")
void listen(ConsumerRecord<?, ?> rec) {
System.out.println(rec);
}
@KafkaListener(id = "so69121558-1", topics = "so69121558")
void pojoListen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}