在具有自定义通道绑定的 spring 云流测试中使用嵌入式 Kafka
Using embedded Kafka in spring cloud stream test with custom channel bindings
我有一个 spring 启动应用程序,我在其中使用 spring-cloud-stream 从 kafka 主题消费,进行一些处理并发布到另一个 kafka 主题。该应用程序运行良好,我编写了单元测试(使用 TestBinder)运行 也很好。
我现在正在尝试使用嵌入式 Kafka 编写集成测试并测试端到端功能。我已经按照这里的示例 https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/testing-samples/test-embedded-kafka/src/test/java/demo/EmbeddedKafkaApplicationTests.java 编写了测试,但是这不起作用 - 我无法收到有关输出主题的任何消息。
application.yml
spring:
cloud:
stream:
bindings:
incoming-message:
destination: ReadyForProcessing
content-type: application/json
group: ReadyForProcessingGroup
outgoing-message:
destination: TransactionSettled
content-type: application/json
TransformerBinding.java
public interface TransformerBinding {
String INCOMING_MESSAGE = "incoming-message";
String OUTGOING_MESSAGE = "outgoing-message";
@Input(INCOMING_MESSAGE)
SubscribableChannel incomingMessage();
@Output(OUTGOING_MESSAGE)
MessageChannel outgoingMessage();
}
EventProcessor.java
@Service
@EnableBinding(TransformerBinding.class)
@Slf4j
@AllArgsConstructor
public class EventProcessor {
@Transformer(inputChannel = TransformerBinding.INCOMING_MESSAGE, outputChannel = TransformerBinding.OUTGOING_MESSAGE)
public TransactionSettledEvent transform(@Payload final ReadyForProcessingEvent readyForProcessingEvent) {
log.info("Event received in processor: {}", readyForProcessingEvent);
return TransactionSettledEvent.builder().transactionRef(readyForProcessingEvent.getTransactionRef()).status("Settled").build();
}
}
EventProcessorTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
+ "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class EventProcessorIT {
private static final String INPUT_TOPIC = "ReadyForProcessing";
private static final String OUTPUT_TOPIC = "TransactionSettled";
private static final String CONSUMER_GROUP = "TestConsumerGroup";
@Autowired
private ObjectMapper mapper;
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, INPUT_TOPIC, OUTPUT_TOPIC);
@BeforeClass
public static void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
}
@Test
public void testSendReceive() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
senderProps.put("key.serializer", StringSerializer.class);
senderProps.put("value.serializer", JsonSerializer.class);
DefaultKafkaProducerFactory<String, ReadyForProcessingEvent> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, ReadyForProcessingEvent> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic(INPUT_TOPIC);
template.sendDefault(ReadyForProcessingEvent.builder().transactionRef("123456").build());
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(CONSUMER_GROUP, "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", JsonDeserializer.class);
DefaultKafkaConsumerFactory<String, TransactionSettledEvent> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, TransactionSettledEvent> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton(OUTPUT_TOPIC));
ConsumerRecords<String, TransactionSettledEvent> records = consumer.poll(0);
consumer.commitSync();
assertEquals("Only 1 record should be received as response", 1, records.count());
final TransactionSettledEvent transactionSettledEvent = this.mapper.convertValue(records.iterator().next().value(), TransactionSettledEvent.class);
assertEquals("Output event not as expected", "Settled", transactionSettledEvent.getStatus());
}
}
上面的测试失败了,因为我希望出现 1 条记录,但我在输出主题中得到 0 条记录。
ConsumerRecords<String, TransactionSettledEvent> records = consumer.poll(0);
您需要等待订阅发生; 0 不会做;示例最多等待 10 秒。
但是,使用
更安全
embeddedKafkaRule().getEmbeddedKafka().consumeFromAnEmbeddedTopic(...);
因为它使用 ConsumerRebalanceListener
.
可靠地等待分配
订阅后,您还可以使用
KafkaTestUtils.getSingleRecord(Consumer<K, V> consumer, String topic);
获取记录(如果您只期望一个,否则 getRecords(...)
)。
我有一个 spring 启动应用程序,我在其中使用 spring-cloud-stream 从 kafka 主题消费,进行一些处理并发布到另一个 kafka 主题。该应用程序运行良好,我编写了单元测试(使用 TestBinder)运行 也很好。
我现在正在尝试使用嵌入式 Kafka 编写集成测试并测试端到端功能。我已经按照这里的示例 https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/testing-samples/test-embedded-kafka/src/test/java/demo/EmbeddedKafkaApplicationTests.java 编写了测试,但是这不起作用 - 我无法收到有关输出主题的任何消息。
application.yml
spring:
cloud:
stream:
bindings:
incoming-message:
destination: ReadyForProcessing
content-type: application/json
group: ReadyForProcessingGroup
outgoing-message:
destination: TransactionSettled
content-type: application/json
TransformerBinding.java
public interface TransformerBinding {
String INCOMING_MESSAGE = "incoming-message";
String OUTGOING_MESSAGE = "outgoing-message";
@Input(INCOMING_MESSAGE)
SubscribableChannel incomingMessage();
@Output(OUTGOING_MESSAGE)
MessageChannel outgoingMessage();
}
EventProcessor.java
@Service
@EnableBinding(TransformerBinding.class)
@Slf4j
@AllArgsConstructor
public class EventProcessor {
@Transformer(inputChannel = TransformerBinding.INCOMING_MESSAGE, outputChannel = TransformerBinding.OUTGOING_MESSAGE)
public TransactionSettledEvent transform(@Payload final ReadyForProcessingEvent readyForProcessingEvent) {
log.info("Event received in processor: {}", readyForProcessingEvent);
return TransactionSettledEvent.builder().transactionRef(readyForProcessingEvent.getTransactionRef()).status("Settled").build();
}
}
EventProcessorTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
+ "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class EventProcessorIT {
private static final String INPUT_TOPIC = "ReadyForProcessing";
private static final String OUTPUT_TOPIC = "TransactionSettled";
private static final String CONSUMER_GROUP = "TestConsumerGroup";
@Autowired
private ObjectMapper mapper;
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, INPUT_TOPIC, OUTPUT_TOPIC);
@BeforeClass
public static void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
}
@Test
public void testSendReceive() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
senderProps.put("key.serializer", StringSerializer.class);
senderProps.put("value.serializer", JsonSerializer.class);
DefaultKafkaProducerFactory<String, ReadyForProcessingEvent> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, ReadyForProcessingEvent> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic(INPUT_TOPIC);
template.sendDefault(ReadyForProcessingEvent.builder().transactionRef("123456").build());
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(CONSUMER_GROUP, "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", JsonDeserializer.class);
DefaultKafkaConsumerFactory<String, TransactionSettledEvent> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, TransactionSettledEvent> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton(OUTPUT_TOPIC));
ConsumerRecords<String, TransactionSettledEvent> records = consumer.poll(0);
consumer.commitSync();
assertEquals("Only 1 record should be received as response", 1, records.count());
final TransactionSettledEvent transactionSettledEvent = this.mapper.convertValue(records.iterator().next().value(), TransactionSettledEvent.class);
assertEquals("Output event not as expected", "Settled", transactionSettledEvent.getStatus());
}
}
上面的测试失败了,因为我希望出现 1 条记录,但我在输出主题中得到 0 条记录。
ConsumerRecords<String, TransactionSettledEvent> records = consumer.poll(0);
您需要等待订阅发生; 0 不会做;示例最多等待 10 秒。
但是,使用
更安全embeddedKafkaRule().getEmbeddedKafka().consumeFromAnEmbeddedTopic(...);
因为它使用 ConsumerRebalanceListener
.
订阅后,您还可以使用
KafkaTestUtils.getSingleRecord(Consumer<K, V> consumer, String topic);
获取记录(如果您只期望一个,否则 getRecords(...)
)。