Spring 云流 - 为给定的 PollableMessageSource 自动装配底层消费者
Spring cloud stream - Autowiring underlying Consumer for a given PollableMessageSource
是否可以为定义的 PollableMessageSource 获取底层 KafkaConsumer bean?
我将绑定定义为:
public interface TestBindings {
String TEST_SOURCE = "test";
@Input(TEST_SOURCE)
PollableMessageSource testTopic();
}
和配置 class:
@EnableBinding(TestBindings.class)
public class TestBindingsPoller {
@Bean
public ApplicationRunner testPoller(PollableMessageSource testTopic) {
// Get kafka consumer for PollableMessageSource
KafkaConsumer kafkaConsumer = getConsumer(testTopic);
return args -> {
while (true) {
if (!testTopic.poll(...) {
Thread.sleep(500);
}
}
};
}
}
问题来了,如何获取testTopic对应的KafkaConsumer呢?有什么方法可以从 spring 云流中连接的 bean 中获取它?
KafkaMessageSource
将 KafkaConsumer
填充到 headers 中,因此可以在您收到消息的地方使用:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java#L57.
如果您打算自己做 poll
之类的事情,我建议您注入一个 ConsumerFactory
并从那里使用消费者。
是否可以为定义的 PollableMessageSource 获取底层 KafkaConsumer bean?
我将绑定定义为:
public interface TestBindings {
String TEST_SOURCE = "test";
@Input(TEST_SOURCE)
PollableMessageSource testTopic();
}
和配置 class:
@EnableBinding(TestBindings.class)
public class TestBindingsPoller {
@Bean
public ApplicationRunner testPoller(PollableMessageSource testTopic) {
// Get kafka consumer for PollableMessageSource
KafkaConsumer kafkaConsumer = getConsumer(testTopic);
return args -> {
while (true) {
if (!testTopic.poll(...) {
Thread.sleep(500);
}
}
};
}
}
问题来了,如何获取testTopic对应的KafkaConsumer呢?有什么方法可以从 spring 云流中连接的 bean 中获取它?
KafkaMessageSource
将 KafkaConsumer
填充到 headers 中,因此可以在您收到消息的地方使用:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java#L57.
如果您打算自己做 poll
之类的事情,我建议您注入一个 ConsumerFactory
并从那里使用消费者。