Spring开机测试KafkaTemplate

Spring boot test KafkaTemplate

我有发送消息的服务

@Service
class ExportTaskService {

  @Autowired
  private KafkaTemplate<String, Object> template;
  
  public void exportNewTask(ImportTaskRequest req) {
    template.send('my-topic-name', req)
  }

}

我配置了bean:consumerFactory, producerFactory, kafkaTemplate (src/main/java)

如果我 运行 应用程序,并执行 metod -- 一切正常,并在真实消息代理中发送消息。

然后我需要 spring 测试,它使用 ExportTaskService.exportNewTask(请求) 和来自同一主题的等待消息。

我的代码,但不工作(我无法接收消息):

    @RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@TestPropertySource(locations="classpath:test.properties")
@EnableKafka
@EmbeddedKafka(
        topics = "new-bitrix-leads", ports = 9092
)
public class ExportingLeadTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    ExportTaskService exportTaskService;

    @Autowired
    ConsumerFactory<String, Object> consumerFactory;

    @Test
    public void test() throws InterruptedException {
        assert(embeddedKafkaBroker != null);
        assert(exportTaskService != null);

        Consumer<String, Object> consumer =  consumerFactory.createConsumer();
        consumer.subscribe(Collections.singletonList("new-bitrix-leads"));
        exportTaskService.exportNewTask(ImportTaskRequest.builder()
                .description("descr")
                .title("title")
                .build());
        ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(3));
        assert (records.count() == 1);
    }
}

如何阅读此消息?我需要做什么 ?我没有想法...

大 TNX :) !!

我的简单解决方案是:

@SpringBootTest
@DirtiesContext
@TestPropertySource(locations="classpath:test.properties")
@EnableKafka
@EmbeddedKafka(
        topics = "new-bitrix-leads", ports = 9092
)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ExportingLeadTests {

    private BlockingQueue<ConsumerRecord<String, Object>> records;

    private KafkaMessageListenerContainer<String, String> container;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    ExportTaskService exportTaskService;

    @BeforeAll
    void setUp() {
        DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerProperties());
        ContainerProperties containerProperties = new ContainerProperties("new-bitrix-leads");
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, Object>) e -> records.add(e));
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterAll
    void tearDown() {
        container.stop();
    }

    private Map<String, Object> getConsumerProperties() {
        Map<String, Object> map = new HashMap<>();
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
        map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
        map.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return map;
    }

    @Test
    public void test() throws InterruptedException {
        exportTaskService.exportNewTask(ImportTaskRequest.builder()
                .description("descr")
                .title("title")
                .gclid("gclid")
                .id("id")
                .name("my name")
                .website("http://website.com")
                .yclid("yclid")
                .source("Source")
                .build());
        ConsumerRecord<String, Object> record = records.poll(5, TimeUnit.SECONDS);
        assert (record != null);
        assertThat(record.value().toString(), containsString("gclid"));
    }
}

确实有效。不错