以编程方式配置侦听器而不是使用注释

Configure listener programmatically instead of using annotations

read the documentation about receiving messages:

You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.

但是我不能让它正常工作。我正在使用 Spring Boot 2.1.2 并且可能对 Spring 豆汤过度加盐,这对我来说弊大于利,所以我想了解它应该如何工作,所以我可以检查我在哪里偏离了荣耀的道路。

如果我对文档的理解正确,配置 MessageListenerContainer 就足够了,例如这里是:

@Configuration
public class MyKafkaConfiguration {
  @Bean
  public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset("spring-kafka-Whosebug-questions", /* partition */ 0, /* Offset */ 0L));
    KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory, cProps);
    result.setupMessageListener(MessageListener<String, String) System.out::println);
    return result;
  }
}

这无一例外地启动了,但似乎并没有真正收听代理上的任何消息。 从我从带有注释的通常流程中得到的信息来看,需要有人以 KafkaListenerEndpoint 的形式将侦听器注册到 KafkaListenerEndpointRegistry.

这是由 KafkaListenerAnnotationBeanPostPorcessor 为所有由 @KafkaListener 注释的方法自动完成的,但是在我想采用路径

的情况下,这应该如何工作

by configuring a MessageListenerContainer and providing a message listener

而不是

using the @KafkaListener annotation

我不太明白。 KafkaAutoConfiguration(Spring Boot 提供)中也没有方法,例如接受一个 List<MessageListenerContainer> 并自动将它们全部注册到注册表中,所以这并不奇怪。

但是,正如文档所建议的那样,它首先应该如何工作?我误解了那部分吗? 有人可以启发我吗?

该文档的下一部分说:

When you use a message listener container, you must provide a listener to receive data. There are currently eight supported interfaces for message listeners. The following listing shows these interfaces

下面还有一段Using KafkaMessageListenerContainer:

To assign a MessageListener to a container, you can use the ContainerProps.setMessageListener method when creating the Container. The following example shows how to do so:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<Integer, String>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

并且在该参考文献的开头有一个完整的样本:https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#a-very-very-quick-example

@Test
public void testAutoCommit() throws Exception {
    logger.info("Start auto");
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    final CountDownLatch latch = new CountDownLatch(4);
    containerProps.setMessageListener(new MessageListener<Integer, String>() {

        @Override
        public void onMessage(ConsumerRecord<Integer, String> message) {
            logger.info("received: " + message);
            latch.countDown();
        }

    });
    KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
    container.setBeanName("testAuto");
    container.start();
    Thread.sleep(1000); // wait a bit for the container to start
    KafkaTemplate<Integer, String> template = createTemplate();
    template.setDefaultTopic(topic1);
    template.sendDefault(0, "foo");
    template.sendDefault(2, "bar");
    template.sendDefault(0, "baz");
    template.sendDefault(2, "qux");
    template.flush();
    assertTrue(latch.await(60, TimeUnit.SECONDS));
    container.stop();
    logger.info("Stop auto");

}

我刚刚将你的 bean 复制到一个新的启动应用程序中,它工作得很好。

端点注册表仅适用于 @KafkaListener 容器,因为它们没有在应用程序上下文中注册为 beans(注册表就是 bean)。


@SpringBootApplication
public class So57628247Application {

    private static final int MessageListener = 0;

    public static void main(String[] args) {
        SpringApplication.run(So57628247Application.class, args);
    }

    @Bean
    public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
        ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset(
                "spring-kafka-Whosebug-questions", /* partition */ 0, /* Offset */ 0L));
        KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory,
                cProps);
        result.setupMessageListener((MessageListener<String, String>) System.out::println);
        return result;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("spring-kafka-Whosebug-questions", 1, (short) 1);
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("spring-kafka-Whosebug-questions", "foo");
        };
    }

}

ConsumerRecord(topic = spring-kafka-Whosebug-questions, partition = 0, offset = 0, CreateTime = 1566573407373, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)