Kafka Streams 以编程方式配置不起作用
Kafka Streams programmatically configuration not working
我正在做一个 spring 引导应用程序,我正在尝试以编程方式配置 kafka,但出于某种原因,我仍然从 application.yaml 获取属性,而不是我以编程方式设置的属性
@Configuration
public class KafkaConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
public ConsumerFactory<String, String> kafkaConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "aaa"); // should crash since is not valid
props.put(ConsumerConfig.GROUP_ID_CONFIG, "app1");
return new DefaultKafkaConsumerFactory<>(props);
}
}
@Component
public class StreamListener {
@StreamListener(TestStreams.TEST_STREAM_IN)
public void testStream(@Payload GenericCustomEvent response, @Headers MessageHeaders headers) throws Exception {
log.debug("Received generic event {} with headers {}", response, headers);
}
}
public interface TestStreams {
String TEST_STREAM_IN = "test-stream-in";
@Input(TEST_STREAM_IN)
SubscribableChannel inputTestStream();
}
@EnableBinding({TestStreams.class})
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication .class, args);
}
}
活页夹不使用 KafkaListenerContainerFactory
,它从 yaml 创建容器本身。
您可以通过添加 ListenerContainerCustomizer
bean 来修改容器。
此处示例Can I apply graceful shutdown when using Spring Cloud Stream Kafka 3.0.3.RELEASE?
我正在做一个 spring 引导应用程序,我正在尝试以编程方式配置 kafka,但出于某种原因,我仍然从 application.yaml 获取属性,而不是我以编程方式设置的属性
@Configuration
public class KafkaConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
public ConsumerFactory<String, String> kafkaConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "aaa"); // should crash since is not valid
props.put(ConsumerConfig.GROUP_ID_CONFIG, "app1");
return new DefaultKafkaConsumerFactory<>(props);
}
}
@Component
public class StreamListener {
@StreamListener(TestStreams.TEST_STREAM_IN)
public void testStream(@Payload GenericCustomEvent response, @Headers MessageHeaders headers) throws Exception {
log.debug("Received generic event {} with headers {}", response, headers);
}
}
public interface TestStreams {
String TEST_STREAM_IN = "test-stream-in";
@Input(TEST_STREAM_IN)
SubscribableChannel inputTestStream();
}
@EnableBinding({TestStreams.class})
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication .class, args);
}
}
活页夹不使用 KafkaListenerContainerFactory
,它从 yaml 创建容器本身。
您可以通过添加 ListenerContainerCustomizer
bean 来修改容器。
此处示例Can I apply graceful shutdown when using Spring Cloud Stream Kafka 3.0.3.RELEASE?