Spring 对于 Kafka 2.3,在运行时使用 KafkaMessageListenerContainer 为特定侦听器设置偏移量
Spring for Kafka 2.3 setting an offset during runtime for specific listener with KafkaMessageListenerContainer
我必须实现一个功能来(重新)将某个 topic/partition 的侦听器设置为任何给定的偏移量。因此,如果事件提交到偏移量 5,并且管理员决定将偏移量重置为 2,则应重新处理事件 3、4 和 5。
我们正在为 Kafka 2.3 使用 Spring,我试图遵循 ConsumerSeekAware 上的文档,这似乎正是我正在寻找的。
然而,问题是我们也在使用在运行时创建的主题。为此,我们使用 KafkaMessageListenerContainer
到 DefaultKafkaConsumerFactory
,但我不知道将 registerSeekCallback 或类似的东西放在哪里。
有什么办法可以实现吗?我无法理解使用 @KafkaListener
注释的 class 如何映射到工厂中创建侦听器的方式。
如有任何帮助,我们将不胜感激。即使这只是对这些东西如何协同工作的解释。
这就是 KafkaMessageListenerContainer 的基本创建方式:
public KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(String topicName,
ContainerPropertiesStrategy containerPropertiesStrategy) {
MessageListener<String, String> messageListener = getMessageListener(topicName);
ConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerFactoryConfiguration());
KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = createKafkaMessageListenerContainer(topicName, messageListener, bootstrapServers, containerPropertiesStrategy, consumerFactory);
return kafkaMessageListenerContainer;
}
public MessageListener<String, String> getMessageListener(String topic) {
MessageListener<String, String> messageListener = new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> message) {
try {
consumerService.consume(topic, message.value());
} catch (IOException e) {
log.log(Level.WARNING, "Message couldn't be consumed", e);
}
}
};
return messageListener;
}
public static KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(
String topicName, MessageListener<String, String> messageListener, String bootstrapServers, ContainerPropertiesStrategy containerPropertiesStrategy,
ConsumerFactory<String, Object> consumerFactory) {
ContainerProperties containerProperties = containerPropertiesStrategy.getContainerPropertiesForTopic(topicName);
containerProperties.setMessageListener(messageListener);
KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(
consumerFactory, containerProperties);
kafkaMessageListenerContainer.setBeanName(topicName);
return kafkaMessageListenerContainer;
}
希望对您有所帮助。
我认为你可以像这样为 spring kafka 使用一些注释,尽管在运行时设置注释中的偏移量可能很尴尬
@KafkaListener(topicPartitions =
@TopicPartition(topic = "${kafka.consumer.topic}", partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "2")}),
containerFactory = "filterKafkaListenerContainerFactory", id = "${kafka.consumer.groupId}")
public void receive(ConsumedObject event) {
log.info(String.format("Consumed message with correlationId: %s", event.getCorrelationId()));
consumerHelper.start(event);
}
或者,这里是我编写的一些代码,用于从给定的偏移量消费,我模拟了消费者在消息上失败,这是使用 KafkaConsumer 而不是 KafkaMessageListenerContainer。
private static void ConsumeFromOffset(KafkaConsumer<String, Customer> consumer, boolean flag, String topic) {
Scanner scanner = new Scanner(System.in);
System.out.print("Enter offset: ");
int offsetInput = scanner.nextInt();
while (true) {
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records) {
Customer customer = record.value();
System.out.println(customer + " has offset ->" + record.offset());
if (record.offset() == 7 && flag) {
System.out.println("simulating consumer failing after offset 7..");
break;
}
}
consumer.commitSync();
if (flag) {
// consumer.seekToBeginning(Stream.of(new TopicPartition(topic, 0)).collect(Collectors.toList())); // consume from the beginning
consumer.seek(new TopicPartition(topic, 0), 3); // consume
flag = false;
}
}
}
关键组件是 AbstractConsumerSeekAware
。希望这足以让您入门...
@SpringBootApplication
public class So59682801Application {
public static void main(String[] args) {
SpringApplication.run(So59682801Application.class, args).close();
}
@Bean
public ApplicationRunner runner(ListenerCreator creator,
KafkaTemplate<String, String> template, GenericApplicationContext context) {
return args -> {
System.out.println("Hit enter to create a listener");
System.in.read();
ConcurrentMessageListenerContainer<String, String> container =
creator.createContainer("so59682801group", "so59682801");
// register the container as a bean so that all the "...Aware" interfaces are satisfied
context.registerBean("so59682801", ConcurrentMessageListenerContainer.class, () -> container);
context.getBean("so59682801", ConcurrentMessageListenerContainer.class); // re-fetch to initialize
container.start();
// send some messages
IntStream.range(0, 10).forEach(i -> template.send("so59682801", "test" + i));
System.out.println("Hit enter to reseek");
System.in.read();
((MyListener) container.getContainerProperties().getMessageListener())
.reseek(new TopicPartition("so59682801", 0), 5L);
System.out.println("Hit enter to exit");
System.in.read();
};
}
}
@Component
class ListenerCreator {
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
ListenerCreator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
factory.getContainerProperties().setIdleEventInterval(5000L);
this.factory = factory;
}
ConcurrentMessageListenerContainer<String, String> createContainer(String groupId, String... topics) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topics);
container.getContainerProperties().setGroupId(groupId);
container.getContainerProperties().setMessageListener(new MyListener());
return container;
}
}
class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
System.out.println(data);
}
public void reseek(TopicPartition partition, long offset) {
getSeekCallbackFor(partition).seek(partition.topic(), partition.partition(), offset);
}
}
在侦听器上调用 reseek()
会在消费者线程从 poll() 唤醒时(实际上是在下一个线程之前)排队寻找消费者线程。
我必须实现一个功能来(重新)将某个 topic/partition 的侦听器设置为任何给定的偏移量。因此,如果事件提交到偏移量 5,并且管理员决定将偏移量重置为 2,则应重新处理事件 3、4 和 5。
我们正在为 Kafka 2.3 使用 Spring,我试图遵循 ConsumerSeekAware 上的文档,这似乎正是我正在寻找的。
然而,问题是我们也在使用在运行时创建的主题。为此,我们使用 KafkaMessageListenerContainer
到 DefaultKafkaConsumerFactory
,但我不知道将 registerSeekCallback 或类似的东西放在哪里。
有什么办法可以实现吗?我无法理解使用 @KafkaListener
注释的 class 如何映射到工厂中创建侦听器的方式。
如有任何帮助,我们将不胜感激。即使这只是对这些东西如何协同工作的解释。
这就是 KafkaMessageListenerContainer 的基本创建方式:
public KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(String topicName,
ContainerPropertiesStrategy containerPropertiesStrategy) {
MessageListener<String, String> messageListener = getMessageListener(topicName);
ConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerFactoryConfiguration());
KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = createKafkaMessageListenerContainer(topicName, messageListener, bootstrapServers, containerPropertiesStrategy, consumerFactory);
return kafkaMessageListenerContainer;
}
public MessageListener<String, String> getMessageListener(String topic) {
MessageListener<String, String> messageListener = new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> message) {
try {
consumerService.consume(topic, message.value());
} catch (IOException e) {
log.log(Level.WARNING, "Message couldn't be consumed", e);
}
}
};
return messageListener;
}
public static KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(
String topicName, MessageListener<String, String> messageListener, String bootstrapServers, ContainerPropertiesStrategy containerPropertiesStrategy,
ConsumerFactory<String, Object> consumerFactory) {
ContainerProperties containerProperties = containerPropertiesStrategy.getContainerPropertiesForTopic(topicName);
containerProperties.setMessageListener(messageListener);
KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(
consumerFactory, containerProperties);
kafkaMessageListenerContainer.setBeanName(topicName);
return kafkaMessageListenerContainer;
}
希望对您有所帮助。
我认为你可以像这样为 spring kafka 使用一些注释,尽管在运行时设置注释中的偏移量可能很尴尬
@KafkaListener(topicPartitions =
@TopicPartition(topic = "${kafka.consumer.topic}", partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "2")}),
containerFactory = "filterKafkaListenerContainerFactory", id = "${kafka.consumer.groupId}")
public void receive(ConsumedObject event) {
log.info(String.format("Consumed message with correlationId: %s", event.getCorrelationId()));
consumerHelper.start(event);
}
或者,这里是我编写的一些代码,用于从给定的偏移量消费,我模拟了消费者在消息上失败,这是使用 KafkaConsumer 而不是 KafkaMessageListenerContainer。
private static void ConsumeFromOffset(KafkaConsumer<String, Customer> consumer, boolean flag, String topic) {
Scanner scanner = new Scanner(System.in);
System.out.print("Enter offset: ");
int offsetInput = scanner.nextInt();
while (true) {
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records) {
Customer customer = record.value();
System.out.println(customer + " has offset ->" + record.offset());
if (record.offset() == 7 && flag) {
System.out.println("simulating consumer failing after offset 7..");
break;
}
}
consumer.commitSync();
if (flag) {
// consumer.seekToBeginning(Stream.of(new TopicPartition(topic, 0)).collect(Collectors.toList())); // consume from the beginning
consumer.seek(new TopicPartition(topic, 0), 3); // consume
flag = false;
}
}
}
关键组件是 AbstractConsumerSeekAware
。希望这足以让您入门...
@SpringBootApplication
public class So59682801Application {
public static void main(String[] args) {
SpringApplication.run(So59682801Application.class, args).close();
}
@Bean
public ApplicationRunner runner(ListenerCreator creator,
KafkaTemplate<String, String> template, GenericApplicationContext context) {
return args -> {
System.out.println("Hit enter to create a listener");
System.in.read();
ConcurrentMessageListenerContainer<String, String> container =
creator.createContainer("so59682801group", "so59682801");
// register the container as a bean so that all the "...Aware" interfaces are satisfied
context.registerBean("so59682801", ConcurrentMessageListenerContainer.class, () -> container);
context.getBean("so59682801", ConcurrentMessageListenerContainer.class); // re-fetch to initialize
container.start();
// send some messages
IntStream.range(0, 10).forEach(i -> template.send("so59682801", "test" + i));
System.out.println("Hit enter to reseek");
System.in.read();
((MyListener) container.getContainerProperties().getMessageListener())
.reseek(new TopicPartition("so59682801", 0), 5L);
System.out.println("Hit enter to exit");
System.in.read();
};
}
}
@Component
class ListenerCreator {
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
ListenerCreator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
factory.getContainerProperties().setIdleEventInterval(5000L);
this.factory = factory;
}
ConcurrentMessageListenerContainer<String, String> createContainer(String groupId, String... topics) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topics);
container.getContainerProperties().setGroupId(groupId);
container.getContainerProperties().setMessageListener(new MyListener());
return container;
}
}
class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
System.out.println(data);
}
public void reseek(TopicPartition partition, long offset) {
getSeekCallbackFor(partition).seek(partition.topic(), partition.partition(), offset);
}
}
在侦听器上调用 reseek()
会在消费者线程从 poll() 唤醒时(实际上是在下一个线程之前)排队寻找消费者线程。