在 spring kafka 中为心跳使用侦听器容器
Use listener container for heartbeats in spring kafka
我正在尝试设置容器来监听心跳主题 - 我只对监听器容器空闲时感兴趣 - 是否可以配置容器而不使用 @KafkaListener 而只使用 EventListener 来监听空闲事件?是否可以在空闲事件侦听器中设置搜索以获取主题的最后一条消息以获取时间和其他详细信息等上下文信息?
是否有一个事件可以知道消费者在空闲事件后开始消费而不需要使用@KafkaListener?
没有消费重启事件
不清楚您所说的“无需使用 @KafkaListener
”是什么意思;您可以创建自己的侦听器...
@SpringBootApplication
public class So63957429Application {
public static void main(String[] args) {
SpringApplication.run(So63957429Application.class, args);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> container(
ConcurrentKafkaListenerContainerFactory<String, String> factory, MyListener myListener) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so63957429");
container.getContainerProperties().setMessageListener(myListener);
return container;
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63957429").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so63957429", "foo");
}
}
@Component
class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(ListenerUtils.recordToString(record, true));
}
@EventListener
public void listen(ListenerContainerIdleEvent event) {
System.out.println("Idle; rewinding to last");
getSeekCallbacks().forEach((tp, callback) -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=so63957429
spring.kafka.listener.idle-event-interval=5000
so63957429-0@0
Idle; rewinding to last
2020-09-18 10:50:54.408 INFO 17195 --- [container-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so63957429-1, groupId=so63957429] Seeking to offset 0 for partition so63957429-0
so63957429-0@0
Idle; rewinding to last
2020-09-18 10:50:59.436 INFO 17195 --- [container-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so63957429-1, groupId=so63957429] Seeking to offset 0 for partition so63957429-0
so63957429-0@0
编辑
如果需要访问最后消费的记录,可以添加RecordInterceptor
:
@Bean
public ConcurrentMessageListenerContainer<String, String> container(
ConcurrentKafkaListenerContainerFactory<String, String> factory, MyListener myListener) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so63957429");
container.getContainerProperties().setMessageListener(myListener);
container.setRecordInterceptor(myListener);
return container;
}
@Component
class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String>,
RecordInterceptor<String, String> {
private ConsumerRecord<String, String> lastRecord;
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(ListenerUtils.recordToString(record, true));
}
@EventListener
public void listen(ListenerContainerIdleEvent event) {
System.out.println(event);
System.out.println("Last Record: " + ListenerUtils.recordToString(this.lastRecord, true));
// getSeekCallbacks().forEach((tp, callback) -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
@EventListener
public void listen(ListenerContainerNoLongerIdleEvent event) {
System.out.println(event);
}
@Override
@Nullable
public ConsumerRecord<String, String> intercept(ConsumerRecord<String, String> record) {
return this.lastRecord = record;
}
}
如果并发> 1,则需要将最后一条记录存储在 ThreadLocal
。
我正在尝试设置容器来监听心跳主题 - 我只对监听器容器空闲时感兴趣 - 是否可以配置容器而不使用 @KafkaListener 而只使用 EventListener 来监听空闲事件?是否可以在空闲事件侦听器中设置搜索以获取主题的最后一条消息以获取时间和其他详细信息等上下文信息?
是否有一个事件可以知道消费者在空闲事件后开始消费而不需要使用@KafkaListener?
没有消费重启事件
不清楚您所说的“无需使用 @KafkaListener
”是什么意思;您可以创建自己的侦听器...
@SpringBootApplication
public class So63957429Application {
public static void main(String[] args) {
SpringApplication.run(So63957429Application.class, args);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> container(
ConcurrentKafkaListenerContainerFactory<String, String> factory, MyListener myListener) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so63957429");
container.getContainerProperties().setMessageListener(myListener);
return container;
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63957429").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so63957429", "foo");
}
}
@Component
class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(ListenerUtils.recordToString(record, true));
}
@EventListener
public void listen(ListenerContainerIdleEvent event) {
System.out.println("Idle; rewinding to last");
getSeekCallbacks().forEach((tp, callback) -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=so63957429
spring.kafka.listener.idle-event-interval=5000
so63957429-0@0
Idle; rewinding to last
2020-09-18 10:50:54.408 INFO 17195 --- [container-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so63957429-1, groupId=so63957429] Seeking to offset 0 for partition so63957429-0
so63957429-0@0
Idle; rewinding to last
2020-09-18 10:50:59.436 INFO 17195 --- [container-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so63957429-1, groupId=so63957429] Seeking to offset 0 for partition so63957429-0
so63957429-0@0
编辑
如果需要访问最后消费的记录,可以添加RecordInterceptor
:
@Bean
public ConcurrentMessageListenerContainer<String, String> container(
ConcurrentKafkaListenerContainerFactory<String, String> factory, MyListener myListener) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so63957429");
container.getContainerProperties().setMessageListener(myListener);
container.setRecordInterceptor(myListener);
return container;
}
@Component
class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String>,
RecordInterceptor<String, String> {
private ConsumerRecord<String, String> lastRecord;
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(ListenerUtils.recordToString(record, true));
}
@EventListener
public void listen(ListenerContainerIdleEvent event) {
System.out.println(event);
System.out.println("Last Record: " + ListenerUtils.recordToString(this.lastRecord, true));
// getSeekCallbacks().forEach((tp, callback) -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
@EventListener
public void listen(ListenerContainerNoLongerIdleEvent event) {
System.out.println(event);
}
@Override
@Nullable
public ConsumerRecord<String, String> intercept(ConsumerRecord<String, String> record) {
return this.lastRecord = record;
}
}
如果并发> 1,则需要将最后一条记录存储在 ThreadLocal
。