如何使用 EmbeddedKafkaRule/EmbeddedKafka 设置 Spring Kafka 测试来修复 TopicExistsException 间歇性错误?
How to set up Spring Kafka test using EmbeddedKafkaRule/ EmbeddedKafka to fix TopicExistsException Intermittent Error?
我在测试我的 Kafka 消费者和生产者时遇到了问题。集成测试间歇性失败 TopicExistsException
.
这就是我当前的测试 class - UserEventListenerTest
对一位消费者的看法:
@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
"application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
private val logger: Logger = LoggerFactory.getLogger(javaClass)
@Value("${application.kafka.user-event-topic}")
private lateinit var userEventTopic: String
@Autowired
private lateinit var kafkaConfigProperties: KafkaConfigProperties
private lateinit var embeddedKafka: EmbeddedKafkaRule
private lateinit var sender: KafkaSender<String, UserEvent>
private lateinit var receiver: KafkaReceiver<String, UserEvent>
@BeforeAll
fun setup() {
embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
embeddedKafka.before()
val producerProps: HashMap<String, Any> = hashMapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
)
val senderOptions = SenderOptions.create<String, UserEvent>(producerProps)
sender = KafkaSender.create(senderOptions)
val consumerProps: HashMap<String, Any> = hashMapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to kafkaConfigProperties.deserializer,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
"schema.registry.url" to kafkaConfigProperties.schemaRegistry,
ConsumerConfig.GROUP_ID_CONFIG to "test-consumer"
)
val receiverOptions = ReceiverOptions.create<String, UserEvent>(consumerProps)
.subscription(Collections.singleton("some-topic-after-UserEvent"))
receiver = KafkaReceiver.create(receiverOptions)
}
}
// Some tests
// Not shown as they are irrelevant
...
...
...
UserEventListener
class 使用来自 user-event-topic-UserEventListenerTest
的消息并向 some-topic-after-UserEvent
发布消息。
正如您从设置中看到的那样,我有一个测试生产者将向 user-event-topic-UserEventListenerTest
发布消息,以便我可以测试 UserEventListener
是否使用该消息和一个测试消费者将使用来自 some-topic-after-UserEvent
的消息,以便我可以查看 UserEventListener
是否在处理记录后向 some-topic-after-UserEvent
发布消息。
KafkaConfigProperties
class如下
@Component
@ConfigurationProperties(prefix = "application.kafka")
data class KafkaConfigProperties(
var bootstrap: String = "",
var schemaRegistry: String = "",
var deserializer: String = "",
var userEventTopic: String = "",
)
application.yml
看起来像这样。
application:
kafka:
user-event-topic: "platform.user-events.v1"
bootstrap: "localhost:9092"
schema-registry: "http://localhost:8081"
deserializer: com.project.userservice.config.MockAvroDeserializer
错误日志
com.project.userservice.user.UserEventListenerTest > initializationError FAILED
kafka.common.KafkaException:
at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:354)
at org.springframework.kafka.test.EmbeddedKafkaBroker.lambda$createKafkaTopics(EmbeddedKafkaBroker.java:341)
at org.springframework.kafka.test.EmbeddedKafkaBroker.doWithAdmin(EmbeddedKafkaBroker.java:368)
at org.springframework.kafka.test.EmbeddedKafkaBroker.createKafkaTopics(EmbeddedKafkaBroker.java:340)
at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:284)
at org.springframework.kafka.test.rule.EmbeddedKafkaRule.before(EmbeddedKafkaRule.java:114)
at com.project.userservice.user.UserEventListenerTest.setup(UserEventListenerTest.kt:62)
Caused by:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=14=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:351)
... 6 more
Caused by:
org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.
我尝试过的:
- 通过指定 bootstrap 配置,在每个测试中使用不同的 bootstrap 服务器地址,例如
@SpringBootTest(properties = ["application.kafka.bootstrap=localhost:2345"])
- 通过
@SpringBootTest
覆盖主题配置,在每个测试中使用不同的主题名称,就像上一个要点 bootstrap 服务器覆盖一样
- 在每个测试中添加
@DirtiesContext
class
包版本
- 科特林 1.3.61
- Spring 启动 - 2.2.3.RELEASE
- io.projectreactor.kafka:reactor-kafka:1.2.2.RELEASE
- org.springframework.kafka:spring-kafka-test:2.3.4.RELEASE(仅测试实施)
问题
我有多个使用 EmbeddedKafkaRule
的测试 class 并且设置大致相同。对于它们中的每一个,我都指定了不同的 kafka bootstrap 服务器地址和主题名称,但我仍然间歇性地看到 TopicAlreadyExists 异常。
如何使我的测试class保持一致?
I specify different kafka bootstrap server address and topic names, but I still see the TopicAlreadyExists exceptions intermittently
这没有意义;如果他们每次都有一个新端口,尤其是新主题名称,则该主题不可能已经存在。
一些建议:
- 由于您使用的是 JUnit5,因此请不要使用 JUnit4
EmbeddedKafkaRule
,而应使用 EmbeddedKafkaBroker
;或者简单地添加 @EmbeddedKafka
,代理将作为一个 bean 添加到 Spring 应用程序上下文中,其生命周期由 Spring 管理(使用 @DirtiesContext
销毁);对于非 Spring 测试,代理将由 JUnit5 EmbeddedKafkaCondition
创建(和销毁)并可通过 EmbeddedKafkaCondition.getBroker()
. 获得
- 不要使用显式端口;让代理使用其默认的随机端口并使用
embeddedKafka.getBrokersAsString()
作为 bootstrap 服务器 属性.
- 如果您必须自己管理经纪人(在
@BeforeAll
中),destroy()
他们在 @AfterAll
中。
我在测试我的 Kafka 消费者和生产者时遇到了问题。集成测试间歇性失败 TopicExistsException
.
这就是我当前的测试 class - UserEventListenerTest
对一位消费者的看法:
@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
"application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
private val logger: Logger = LoggerFactory.getLogger(javaClass)
@Value("${application.kafka.user-event-topic}")
private lateinit var userEventTopic: String
@Autowired
private lateinit var kafkaConfigProperties: KafkaConfigProperties
private lateinit var embeddedKafka: EmbeddedKafkaRule
private lateinit var sender: KafkaSender<String, UserEvent>
private lateinit var receiver: KafkaReceiver<String, UserEvent>
@BeforeAll
fun setup() {
embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
embeddedKafka.before()
val producerProps: HashMap<String, Any> = hashMapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
)
val senderOptions = SenderOptions.create<String, UserEvent>(producerProps)
sender = KafkaSender.create(senderOptions)
val consumerProps: HashMap<String, Any> = hashMapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to kafkaConfigProperties.deserializer,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
"schema.registry.url" to kafkaConfigProperties.schemaRegistry,
ConsumerConfig.GROUP_ID_CONFIG to "test-consumer"
)
val receiverOptions = ReceiverOptions.create<String, UserEvent>(consumerProps)
.subscription(Collections.singleton("some-topic-after-UserEvent"))
receiver = KafkaReceiver.create(receiverOptions)
}
}
// Some tests
// Not shown as they are irrelevant
...
...
...
UserEventListener
class 使用来自 user-event-topic-UserEventListenerTest
的消息并向 some-topic-after-UserEvent
发布消息。
正如您从设置中看到的那样,我有一个测试生产者将向 user-event-topic-UserEventListenerTest
发布消息,以便我可以测试 UserEventListener
是否使用该消息和一个测试消费者将使用来自 some-topic-after-UserEvent
的消息,以便我可以查看 UserEventListener
是否在处理记录后向 some-topic-after-UserEvent
发布消息。
KafkaConfigProperties
class如下
@Component
@ConfigurationProperties(prefix = "application.kafka")
data class KafkaConfigProperties(
var bootstrap: String = "",
var schemaRegistry: String = "",
var deserializer: String = "",
var userEventTopic: String = "",
)
application.yml
看起来像这样。
application:
kafka:
user-event-topic: "platform.user-events.v1"
bootstrap: "localhost:9092"
schema-registry: "http://localhost:8081"
deserializer: com.project.userservice.config.MockAvroDeserializer
错误日志
com.project.userservice.user.UserEventListenerTest > initializationError FAILED
kafka.common.KafkaException:
at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:354)
at org.springframework.kafka.test.EmbeddedKafkaBroker.lambda$createKafkaTopics(EmbeddedKafkaBroker.java:341)
at org.springframework.kafka.test.EmbeddedKafkaBroker.doWithAdmin(EmbeddedKafkaBroker.java:368)
at org.springframework.kafka.test.EmbeddedKafkaBroker.createKafkaTopics(EmbeddedKafkaBroker.java:340)
at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:284)
at org.springframework.kafka.test.rule.EmbeddedKafkaRule.before(EmbeddedKafkaRule.java:114)
at com.project.userservice.user.UserEventListenerTest.setup(UserEventListenerTest.kt:62)
Caused by:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=14=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:351)
... 6 more
Caused by:
org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.
我尝试过的:
- 通过指定 bootstrap 配置,在每个测试中使用不同的 bootstrap 服务器地址,例如
@SpringBootTest(properties = ["application.kafka.bootstrap=localhost:2345"])
- 通过
@SpringBootTest
覆盖主题配置,在每个测试中使用不同的主题名称,就像上一个要点 bootstrap 服务器覆盖一样 - 在每个测试中添加
@DirtiesContext
class
包版本
- 科特林 1.3.61
- Spring 启动 - 2.2.3.RELEASE
- io.projectreactor.kafka:reactor-kafka:1.2.2.RELEASE
- org.springframework.kafka:spring-kafka-test:2.3.4.RELEASE(仅测试实施)
问题
我有多个使用 EmbeddedKafkaRule
的测试 class 并且设置大致相同。对于它们中的每一个,我都指定了不同的 kafka bootstrap 服务器地址和主题名称,但我仍然间歇性地看到 TopicAlreadyExists 异常。
如何使我的测试class保持一致?
I specify different kafka bootstrap server address and topic names, but I still see the TopicAlreadyExists exceptions intermittently
这没有意义;如果他们每次都有一个新端口,尤其是新主题名称,则该主题不可能已经存在。
一些建议:
- 由于您使用的是 JUnit5,因此请不要使用 JUnit4
EmbeddedKafkaRule
,而应使用EmbeddedKafkaBroker
;或者简单地添加@EmbeddedKafka
,代理将作为一个 bean 添加到 Spring 应用程序上下文中,其生命周期由 Spring 管理(使用@DirtiesContext
销毁);对于非 Spring 测试,代理将由 JUnit5EmbeddedKafkaCondition
创建(和销毁)并可通过EmbeddedKafkaCondition.getBroker()
. 获得
- 不要使用显式端口;让代理使用其默认的随机端口并使用
embeddedKafka.getBrokersAsString()
作为 bootstrap 服务器 属性. - 如果您必须自己管理经纪人(在
@BeforeAll
中),destroy()
他们在@AfterAll
中。