Kafka Streams 以编程方式配置不起作用

Kafka Streams programmatically configuration not working

我正在做一个 spring 引导应用程序,我正在尝试以编程方式配置 kafka,但出于某种原因,我仍然从 application.yaml 获取属性,而不是我以编程方式设置的属性

@Configuration
public class KafkaConfiguration {
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(30000);

        return factory;
    }

    public ConsumerFactory<String, String> kafkaConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "aaa"); // should crash since is not valid
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "app1");
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

@Component
public class StreamListener {
    @StreamListener(TestStreams.TEST_STREAM_IN)
    public void testStream(@Payload GenericCustomEvent response, @Headers MessageHeaders headers) throws Exception {
        log.debug("Received generic event {} with headers {}", response, headers);
    }
}

public interface TestStreams {
    String TEST_STREAM_IN = "test-stream-in";

    @Input(TEST_STREAM_IN)
    SubscribableChannel inputTestStream();
}

@EnableBinding({TestStreams.class})
@SpringBootApplication
public class KafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication .class, args);
    }
}

活页夹不使用 KafkaListenerContainerFactory,它从 yaml 创建容器本身。

您可以通过添加 ListenerContainerCustomizer bean 来修改容器。

此处示例Can I apply graceful shutdown when using Spring Cloud Stream Kafka 3.0.3.RELEASE?