以编程方式配置侦听器而不是使用注释
Configure listener programmatically instead of using annotations
我 read the documentation about receiving messages:
You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.
但是我不能让它正常工作。我正在使用 Spring Boot 2.1.2 并且可能对 Spring 豆汤过度加盐,这对我来说弊大于利,所以我想了解它应该如何工作,所以我可以检查我在哪里偏离了荣耀的道路。
如果我对文档的理解正确,配置 MessageListenerContainer
就足够了,例如这里是:
@Configuration
public class MyKafkaConfiguration {
@Bean
public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset("spring-kafka-Whosebug-questions", /* partition */ 0, /* Offset */ 0L));
KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory, cProps);
result.setupMessageListener(MessageListener<String, String) System.out::println);
return result;
}
}
这无一例外地启动了,但似乎并没有真正收听代理上的任何消息。
从我从带有注释的通常流程中得到的信息来看,需要有人以 KafkaListenerEndpoint
的形式将侦听器注册到 KafkaListenerEndpointRegistry
.
这是由 KafkaListenerAnnotationBeanPostPorcessor
为所有由 @KafkaListener
注释的方法自动完成的,但是在我想采用路径
的情况下,这应该如何工作
by configuring a MessageListenerContainer and providing a message listener
而不是
using the @KafkaListener annotation
我不太明白。 KafkaAutoConfiguration
(Spring Boot 提供)中也没有方法,例如接受一个 List<MessageListenerContainer>
并自动将它们全部注册到注册表中,所以这并不奇怪。
但是,正如文档所建议的那样,它首先应该如何工作?我误解了那部分吗?
有人可以启发我吗?
该文档的下一部分说:
When you use a message listener container, you must provide a listener to receive data. There are currently eight supported interfaces for message listeners. The following listing shows these interfaces
下面还有一段Using KafkaMessageListenerContainer
:
To assign a MessageListener
to a container, you can use the ContainerProps.setMessageListener
method when creating the Container. The following example shows how to do so:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
并且在该参考文献的开头有一个完整的样本:https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#a-very-very-quick-example
@Test
public void testAutoCommit() throws Exception {
logger.info("Start auto");
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
Thread.sleep(1000); // wait a bit for the container to start
KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic(topic1);
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
logger.info("Stop auto");
}
我刚刚将你的 bean 复制到一个新的启动应用程序中,它工作得很好。
端点注册表仅适用于 @KafkaListener
容器,因为它们没有在应用程序上下文中注册为 beans(注册表就是 bean)。
@SpringBootApplication
public class So57628247Application {
private static final int MessageListener = 0;
public static void main(String[] args) {
SpringApplication.run(So57628247Application.class, args);
}
@Bean
public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset(
"spring-kafka-Whosebug-questions", /* partition */ 0, /* Offset */ 0L));
KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory,
cProps);
result.setupMessageListener((MessageListener<String, String>) System.out::println);
return result;
}
@Bean
public NewTopic topic() {
return new NewTopic("spring-kafka-Whosebug-questions", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("spring-kafka-Whosebug-questions", "foo");
};
}
}
和
ConsumerRecord(topic = spring-kafka-Whosebug-questions, partition = 0, offset = 0, CreateTime = 1566573407373, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)
我 read the documentation about receiving messages:
You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.
但是我不能让它正常工作。我正在使用 Spring Boot 2.1.2 并且可能对 Spring 豆汤过度加盐,这对我来说弊大于利,所以我想了解它应该如何工作,所以我可以检查我在哪里偏离了荣耀的道路。
如果我对文档的理解正确,配置 MessageListenerContainer
就足够了,例如这里是:
@Configuration
public class MyKafkaConfiguration {
@Bean
public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset("spring-kafka-Whosebug-questions", /* partition */ 0, /* Offset */ 0L));
KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory, cProps);
result.setupMessageListener(MessageListener<String, String) System.out::println);
return result;
}
}
这无一例外地启动了,但似乎并没有真正收听代理上的任何消息。
从我从带有注释的通常流程中得到的信息来看,需要有人以 KafkaListenerEndpoint
的形式将侦听器注册到 KafkaListenerEndpointRegistry
.
这是由 KafkaListenerAnnotationBeanPostPorcessor
为所有由 @KafkaListener
注释的方法自动完成的,但是在我想采用路径
by configuring a MessageListenerContainer and providing a message listener
而不是
using the @KafkaListener annotation
我不太明白。 KafkaAutoConfiguration
(Spring Boot 提供)中也没有方法,例如接受一个 List<MessageListenerContainer>
并自动将它们全部注册到注册表中,所以这并不奇怪。
但是,正如文档所建议的那样,它首先应该如何工作?我误解了那部分吗? 有人可以启发我吗?
该文档的下一部分说:
When you use a message listener container, you must provide a listener to receive data. There are currently eight supported interfaces for message listeners. The following listing shows these interfaces
下面还有一段Using KafkaMessageListenerContainer
:
To assign a
MessageListener
to a container, you can use theContainerProps.setMessageListener
method when creating the Container. The following example shows how to do so:ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); containerProps.setMessageListener(new MessageListener<Integer, String>() { ... }); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container;
并且在该参考文献的开头有一个完整的样本:https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#a-very-very-quick-example
@Test
public void testAutoCommit() throws Exception {
logger.info("Start auto");
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
Thread.sleep(1000); // wait a bit for the container to start
KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic(topic1);
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
logger.info("Stop auto");
}
我刚刚将你的 bean 复制到一个新的启动应用程序中,它工作得很好。
端点注册表仅适用于 @KafkaListener
容器,因为它们没有在应用程序上下文中注册为 beans(注册表就是 bean)。
@SpringBootApplication
public class So57628247Application {
private static final int MessageListener = 0;
public static void main(String[] args) {
SpringApplication.run(So57628247Application.class, args);
}
@Bean
public MessageListenerContainer myVeryOwnListener(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties cProps = new ContainerProperties(new TopicPartitionInitialOffset(
"spring-kafka-Whosebug-questions", /* partition */ 0, /* Offset */ 0L));
KafkaMessageListenerContainer<String, String> result = new KafkaMessageListenerContainer<>(consumerFactory,
cProps);
result.setupMessageListener((MessageListener<String, String>) System.out::println);
return result;
}
@Bean
public NewTopic topic() {
return new NewTopic("spring-kafka-Whosebug-questions", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("spring-kafka-Whosebug-questions", "foo");
};
}
}
和
ConsumerRecord(topic = spring-kafka-Whosebug-questions, partition = 0, offset = 0, CreateTime = 1566573407373, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)