Spring 云流 - 为给定的 PollableMessageSource 自动装配底层消费者

Spring cloud stream - Autowiring underlying Consumer for a given PollableMessageSource

是否可以为定义的 PollableMessageSource 获取底层 KafkaConsumer bean?

我将绑定定义为:

public interface TestBindings {

    String TEST_SOURCE =  "test";

    @Input(TEST_SOURCE)
    PollableMessageSource testTopic();
}

和配置 class:

@EnableBinding(TestBindings.class)
public class TestBindingsPoller {
    @Bean
    public ApplicationRunner testPoller(PollableMessageSource testTopic) {
        // Get kafka consumer for PollableMessageSource
        KafkaConsumer kafkaConsumer = getConsumer(testTopic);
        return args -> {
            while (true) {
                if (!testTopic.poll(...) {
                    Thread.sleep(500);
                }
            }
        };
    }
}

问题来了,如何获取testTopic对应的KafkaConsumer呢?有什么方法可以从 spring 云流中连接的 bean 中获取它?

KafkaMessageSourceKafkaConsumer 填充到 headers 中,因此可以在您收到消息的地方使用:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java#L57.

如果您打算自己做 poll 之类的事情,我建议您注入一个 ConsumerFactory 并从那里使用消费者。