Kafka 测试容器片状测试
Kafka test container flaky tests
将 kafka 集成到我的 spring 项目中。已经使用 TestContainer 方法编写了一个集成,但测试有时会失败。 kafka 服务器的初始化似乎有些问题。
下面是我的代码
def setupSpec() {
kafka = new KafkaContainer()
kafka.start()
System.setProperty("kafka.consumer.endpoint", kafka.bootstrapServers.replace("PLAINTEXT://", ""))
}
def setup() {
RestAssured.port = port
}
def "test profile update events"() {
given:
String INPUT_TOPIC = "EventXX"
when:
KafkaProducer<String, String> kafkaProducer = createProducer()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
then:
TestUtil.waitFor({ EventConsumer.msgConsumed == true }, 5000)
kafkaProducer.close()
}
现在有趣的是,如果我在测试中发送消息之前添加 Thread.sleep(10000) 它总是有效,但这方法对我来说看起来有点脏。我们如何确保 kafka 服务器已启动并且 运行 在 运行 任何测试之前。我通过验证 setupSpec[=25 中的 kafkaSendRecieve 尝试了以下方法=] 但失败了。我正在粘贴下面的代码
def validatekafkaSendRecieve() {
def started = false
String INPUT_TOPIC = "kafkaTest"
def producer = createProducer()
def consumer = createConsumer(INPUT_TOPIC)
Thread.sleep(9000)
while (!started) {
producer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get()
started = consumeMessage(INPUT_TOPIC, consumer)
}
producer.close()
consumer.close()
}
def consumeMessage(String topic, KafkaConsumer kafkaConsumer) {
def message = kafkaConsumer.poll(3)
if (!message.isEmpty()) {
def messageList = message.records(topic).asList()
if (messageList != null && !message.isEmpty()) {
return true
}
} else {
return false
}
}
您是否预先创建了您制作的主题?尝试这样做。不确定这是否是您面临的问题(需要日志),但是当自动创建主题时,需要一些时间才能为其所有分区分配领导者。
将 kafka 集成到我的 spring 项目中。已经使用 TestContainer 方法编写了一个集成,但测试有时会失败。 kafka 服务器的初始化似乎有些问题。
下面是我的代码
def setupSpec() {
kafka = new KafkaContainer()
kafka.start()
System.setProperty("kafka.consumer.endpoint", kafka.bootstrapServers.replace("PLAINTEXT://", ""))
}
def setup() {
RestAssured.port = port
}
def "test profile update events"() {
given:
String INPUT_TOPIC = "EventXX"
when:
KafkaProducer<String, String> kafkaProducer = createProducer()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
then:
TestUtil.waitFor({ EventConsumer.msgConsumed == true }, 5000)
kafkaProducer.close()
}
现在有趣的是,如果我在测试中发送消息之前添加 Thread.sleep(10000) 它总是有效,但这方法对我来说看起来有点脏。我们如何确保 kafka 服务器已启动并且 运行 在 运行 任何测试之前。我通过验证 setupSpec[=25 中的 kafkaSendRecieve 尝试了以下方法=] 但失败了。我正在粘贴下面的代码
def validatekafkaSendRecieve() {
def started = false
String INPUT_TOPIC = "kafkaTest"
def producer = createProducer()
def consumer = createConsumer(INPUT_TOPIC)
Thread.sleep(9000)
while (!started) {
producer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get()
started = consumeMessage(INPUT_TOPIC, consumer)
}
producer.close()
consumer.close()
}
def consumeMessage(String topic, KafkaConsumer kafkaConsumer) {
def message = kafkaConsumer.poll(3)
if (!message.isEmpty()) {
def messageList = message.records(topic).asList()
if (messageList != null && !message.isEmpty()) {
return true
}
} else {
return false
}
}
您是否预先创建了您制作的主题?尝试这样做。不确定这是否是您面临的问题(需要日志),但是当自动创建主题时,需要一些时间才能为其所有分区分配领导者。