将 EmbeddedKafkaBroker 与 spring 集成和 spring kafka 结合使用
Using EmbeddedKafkaBroker with spring integration and spring kafka
我想使用 EmbeddedKafkaBroker
来测试涉及 KafkaMessageDrivenChannelAdapter
、
的流程
看起来消费者开始正确,订阅了主题,但在将消息推送到 EmbeddedKafkaBroker
.
后未触发处理程序
@SpringBootTest(properties = {"...."}, classes = {....class})
@EmbeddedKafka
class IntTests {
@BeforeAll
static void setup() {
embeddedKafka = new EmbeddedKafkaBroker(1, true, TOPIC);
embeddedKafka.kafkaPorts(57412);
embeddedKafka.afterPropertiesSet();
}
@Test
void testit() throws InterruptedException {
String ip = embeddedKafka.getBrokersAsString();
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafka));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC, "key", "{\"name\":\"Test\"}"));
producer.flush();
....
}
...
}
主要 class:
@Configuration
public class Kafka {
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =..
kafkaMessageDrivenChannelAdapter.setOutputChannelName("kafkaChannel");
}
@Bean
public KafkaMessageListenerContainer<String, String> container() {
ContainerProperties properties = new ContainerProperties(TOPIC);
KafkaMessageListenerContainer<String, String> kafkaContainer = ...;
return kafkaContainer;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:57412");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group12");
...
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public PublishSubscribeChannel kafkaChannel() {
return new PublishSubscribeChannel ();
}
@Bean
@ServiceActivator(inputChannel = "kafkaChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
}
};
}
...
}
在日志中我确实看到了:
clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group12-1, groupId=group12] Subscribed to topic(s): TOPIC
ThreadPoolTaskScheduler : Initializing ExecutorService
KafkaMessageDrivenChannelAdapter : started bean 'adapter'; defined in: 'com.example.demo.demo.Kafka';
有了 embeddedKafka = new EmbeddedKafkaBroker(1, true, TOPIC);
和 @EmbeddedKafka
,您实际上启动了两个独立的 Kafka 集群。如果您想更改嵌入式代理的随机端口,请参阅 @EmbeddedKafka
的 ports
选项。但同时最好依靠 Spring Boot 为我们提供的自动配置。
有关详细信息,请参阅文档:https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-embedded-kafka。注意bootstrapServersProperty = "spring.kafka.bootstrap-servers"
属性.
更新
在你的测试中你有这个@SpringBootTest(classes = {Kafka.class})
。当我删除那个 classes
属性时,一切都开始工作了。您的配置 class 不是自动配置感知的问题,因此您没有正确初始化 Spring 集成并且消息未从通道中使用。可能还有其他影响。但是仍然:最好依靠自动配置,所以让您的测试看到 @SpringBootApplication
注释。
我想使用 EmbeddedKafkaBroker
来测试涉及 KafkaMessageDrivenChannelAdapter
、
的流程
看起来消费者开始正确,订阅了主题,但在将消息推送到 EmbeddedKafkaBroker
.
@SpringBootTest(properties = {"...."}, classes = {....class})
@EmbeddedKafka
class IntTests {
@BeforeAll
static void setup() {
embeddedKafka = new EmbeddedKafkaBroker(1, true, TOPIC);
embeddedKafka.kafkaPorts(57412);
embeddedKafka.afterPropertiesSet();
}
@Test
void testit() throws InterruptedException {
String ip = embeddedKafka.getBrokersAsString();
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafka));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC, "key", "{\"name\":\"Test\"}"));
producer.flush();
....
}
...
}
主要 class:
@Configuration
public class Kafka {
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =..
kafkaMessageDrivenChannelAdapter.setOutputChannelName("kafkaChannel");
}
@Bean
public KafkaMessageListenerContainer<String, String> container() {
ContainerProperties properties = new ContainerProperties(TOPIC);
KafkaMessageListenerContainer<String, String> kafkaContainer = ...;
return kafkaContainer;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:57412");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group12");
...
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public PublishSubscribeChannel kafkaChannel() {
return new PublishSubscribeChannel ();
}
@Bean
@ServiceActivator(inputChannel = "kafkaChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
}
};
}
...
}
在日志中我确实看到了:
clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group12-1, groupId=group12] Subscribed to topic(s): TOPIC
ThreadPoolTaskScheduler : Initializing ExecutorService
KafkaMessageDrivenChannelAdapter : started bean 'adapter'; defined in: 'com.example.demo.demo.Kafka';
有了 embeddedKafka = new EmbeddedKafkaBroker(1, true, TOPIC);
和 @EmbeddedKafka
,您实际上启动了两个独立的 Kafka 集群。如果您想更改嵌入式代理的随机端口,请参阅 @EmbeddedKafka
的 ports
选项。但同时最好依靠 Spring Boot 为我们提供的自动配置。
有关详细信息,请参阅文档:https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-embedded-kafka。注意bootstrapServersProperty = "spring.kafka.bootstrap-servers"
属性.
更新
在你的测试中你有这个@SpringBootTest(classes = {Kafka.class})
。当我删除那个 classes
属性时,一切都开始工作了。您的配置 class 不是自动配置感知的问题,因此您没有正确初始化 Spring 集成并且消息未从通道中使用。可能还有其他影响。但是仍然:最好依靠自动配置,所以让您的测试看到 @SpringBootApplication
注释。