Spring开机测试KafkaTemplate
Spring boot test KafkaTemplate
我有发送消息的服务
@Service
class ExportTaskService {
@Autowired
private KafkaTemplate<String, Object> template;
public void exportNewTask(ImportTaskRequest req) {
template.send('my-topic-name', req)
}
}
我配置了bean:consumerFactory, producerFactory, kafkaTemplate (src/main/java)
如果我 运行 应用程序,并执行 metod -- 一切正常,并在真实消息代理中发送消息。
然后我需要 spring 测试,它使用 ExportTaskService.exportNewTask(请求) 和来自同一主题的等待消息。
我的代码,但不工作(我无法接收消息):
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@TestPropertySource(locations="classpath:test.properties")
@EnableKafka
@EmbeddedKafka(
topics = "new-bitrix-leads", ports = 9092
)
public class ExportingLeadTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
ExportTaskService exportTaskService;
@Autowired
ConsumerFactory<String, Object> consumerFactory;
@Test
public void test() throws InterruptedException {
assert(embeddedKafkaBroker != null);
assert(exportTaskService != null);
Consumer<String, Object> consumer = consumerFactory.createConsumer();
consumer.subscribe(Collections.singletonList("new-bitrix-leads"));
exportTaskService.exportNewTask(ImportTaskRequest.builder()
.description("descr")
.title("title")
.build());
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(3));
assert (records.count() == 1);
}
}
如何阅读此消息?我需要做什么 ?我没有想法...
大 TNX :) !!
我的简单解决方案是:
@SpringBootTest
@DirtiesContext
@TestPropertySource(locations="classpath:test.properties")
@EnableKafka
@EmbeddedKafka(
topics = "new-bitrix-leads", ports = 9092
)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ExportingLeadTests {
private BlockingQueue<ConsumerRecord<String, Object>> records;
private KafkaMessageListenerContainer<String, String> container;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
ExportTaskService exportTaskService;
@BeforeAll
void setUp() {
DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerProperties());
ContainerProperties containerProperties = new ContainerProperties("new-bitrix-leads");
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, Object>) e -> records.add(e));
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
private Map<String, Object> getConsumerProperties() {
Map<String, Object> map = new HashMap<>();
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
map.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return map;
}
@Test
public void test() throws InterruptedException {
exportTaskService.exportNewTask(ImportTaskRequest.builder()
.description("descr")
.title("title")
.gclid("gclid")
.id("id")
.name("my name")
.website("http://website.com")
.yclid("yclid")
.source("Source")
.build());
ConsumerRecord<String, Object> record = records.poll(5, TimeUnit.SECONDS);
assert (record != null);
assertThat(record.value().toString(), containsString("gclid"));
}
}
确实有效。不错
我有发送消息的服务
@Service
class ExportTaskService {
@Autowired
private KafkaTemplate<String, Object> template;
public void exportNewTask(ImportTaskRequest req) {
template.send('my-topic-name', req)
}
}
我配置了bean:consumerFactory, producerFactory, kafkaTemplate (src/main/java)
如果我 运行 应用程序,并执行 metod -- 一切正常,并在真实消息代理中发送消息。
然后我需要 spring 测试,它使用 ExportTaskService.exportNewTask(请求) 和来自同一主题的等待消息。
我的代码,但不工作(我无法接收消息):
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@TestPropertySource(locations="classpath:test.properties")
@EnableKafka
@EmbeddedKafka(
topics = "new-bitrix-leads", ports = 9092
)
public class ExportingLeadTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
ExportTaskService exportTaskService;
@Autowired
ConsumerFactory<String, Object> consumerFactory;
@Test
public void test() throws InterruptedException {
assert(embeddedKafkaBroker != null);
assert(exportTaskService != null);
Consumer<String, Object> consumer = consumerFactory.createConsumer();
consumer.subscribe(Collections.singletonList("new-bitrix-leads"));
exportTaskService.exportNewTask(ImportTaskRequest.builder()
.description("descr")
.title("title")
.build());
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(3));
assert (records.count() == 1);
}
}
如何阅读此消息?我需要做什么 ?我没有想法...
大 TNX :) !!
我的简单解决方案是:
@SpringBootTest
@DirtiesContext
@TestPropertySource(locations="classpath:test.properties")
@EnableKafka
@EmbeddedKafka(
topics = "new-bitrix-leads", ports = 9092
)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ExportingLeadTests {
private BlockingQueue<ConsumerRecord<String, Object>> records;
private KafkaMessageListenerContainer<String, String> container;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
ExportTaskService exportTaskService;
@BeforeAll
void setUp() {
DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerProperties());
ContainerProperties containerProperties = new ContainerProperties("new-bitrix-leads");
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, Object>) e -> records.add(e));
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
private Map<String, Object> getConsumerProperties() {
Map<String, Object> map = new HashMap<>();
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
map.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return map;
}
@Test
public void test() throws InterruptedException {
exportTaskService.exportNewTask(ImportTaskRequest.builder()
.description("descr")
.title("title")
.gclid("gclid")
.id("id")
.name("my name")
.website("http://website.com")
.yclid("yclid")
.source("Source")
.build());
ConsumerRecord<String, Object> record = records.poll(5, TimeUnit.SECONDS);
assert (record != null);
assertThat(record.value().toString(), containsString("gclid"));
}
}
确实有效。不错