Spring卡夫卡。非阻塞重试 - 即使 autoCreateTopics 属性 为 false 也会创建主题
Spring Kafka. Non-blocking retries - topics are created even when autoCreateTopics property is false
我正在尝试使用具有非阻塞重试功能的 spring-kafka 2.8.0 为我的项目实现非阻塞重试。
我做了以下事情。 autoCreateTopics 属性 为假!
@KafkaListener(
id = "customListenerId",
idIsGroup = false,
topics = "main-topic-1, main-topic-2")
@RetryableTopic(
autoCreateTopics = "false",
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
backoff = @Backoff(delay = 10000),
attempts = "6",
kafkaTemplate = "kafkaTemplate",
include = {
AerospikeException.Timeout.class,
QueryTimeoutException.class,
HystrixRuntimeException.class})
public void consumePerTopic(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Payload byte[] payload,
Acknowledgment acknowledgment) {
processor.process(objectMapper.readValue(payload, CreateRequest.class));
acknowledgment.acknowledge();
}
我得到了以下集成测试,它使用 kafka 作为嵌入式容器。
测试模拟数据库连接问题 - 被测代码必须抛出 AerospikeException.Timeout,这在消费者中被声明为可重试。
@Test
void shouldNotCreateEntityForTopicsWhenDbTimedOutAndProduceRetryMessages() {
performWithDbLatency(Duration.ofMillis(1000), () -> {
var createEntityIdempotencyKeys = entityKafkaSteps.sendCreateRequestToAllTopicsWithIdempotency(
getCreateEntityRequest("entity-owner-a"));
retryEntityKafkaSteps.consumeRetryCreateRequestsByIdempotencyKeys(createEntityIdempotencyKeys);
});
}
我的嵌入式 kafka 容器配置如下
embedded:
kafka:
topicsToCreate: main-topic-1, main-topic-2
我了解 kafka 的默认行为是允许自动创建主题。
该测试成功通过,但据我了解,autoCreateTopics 属性 应该阻止重试和 dlt 主题创建,即使它在 kafka 代理中是允许的 -> 并且测试必须失败,因为代理中没有重试和 dlt 主题。
请说明这是不是预期的行为,或者我是否遗漏了什么?
如果使用 doNotAutoCreateRetryTopics() 方法通过 RetryTopicConfiguration bean 声明重试功能,则会出现相同的行为。
非常感谢。
已编辑:
我主要担心的是:
- 我禁止 spring-kafka 为我的消费者创建 main-topic-1 和 main-topic-2 的重试和 dlt 主题 使用
autoCreateTopics = "false"
.
- 我创建 ONLY main-topic-1 和 main-topic-2我的嵌入式 kafka 容器使用
embedded.kafka.topicsToCreate
属性。 (Kafka 代理默认 属性 是 allow.auto.create.topics = true
)
- 在运行时我可以看到 main-topic-1-retry, main-topic-1-dlt, main-topic-2-retry 和 main-topic-2-dlt 仍然创建。
No autoCreateTopics = "false"
仅表示框架不会创建主题;它与代理是否配置为创建它们无关。
@EmbeddedKafka(topics = ...)
将始终创建主题。
要在嵌入式 kafka 代理中禁用主题创建,请使用
@EmbeddedKafka(brokerProperties = "auto.create.topics.enable:false")
并且不要在此处指定任何 topics
。
或者你可以将消费者属性allow.auto.create.topics
设置为false
;它会覆盖代理 属性(如果为真)。
https://kafka.apache.org/documentation/#consumerconfigs_allow.auto.create.topics
@Gary Russel 非常感谢您的耐心等待 - 我终于明白了根本原因。主题不是由 spring-kafka 创建的,而是由在我的测试中连接到重试和 dlt 主题的消费者创建的 -> 代理允许它这样做 -> 主题是自动创建的。
感谢您的帮助!
我正在尝试使用具有非阻塞重试功能的 spring-kafka 2.8.0 为我的项目实现非阻塞重试。
我做了以下事情。 autoCreateTopics 属性 为假!
@KafkaListener(
id = "customListenerId",
idIsGroup = false,
topics = "main-topic-1, main-topic-2")
@RetryableTopic(
autoCreateTopics = "false",
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
backoff = @Backoff(delay = 10000),
attempts = "6",
kafkaTemplate = "kafkaTemplate",
include = {
AerospikeException.Timeout.class,
QueryTimeoutException.class,
HystrixRuntimeException.class})
public void consumePerTopic(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Payload byte[] payload,
Acknowledgment acknowledgment) {
processor.process(objectMapper.readValue(payload, CreateRequest.class));
acknowledgment.acknowledge();
}
我得到了以下集成测试,它使用 kafka 作为嵌入式容器。 测试模拟数据库连接问题 - 被测代码必须抛出 AerospikeException.Timeout,这在消费者中被声明为可重试。
@Test
void shouldNotCreateEntityForTopicsWhenDbTimedOutAndProduceRetryMessages() {
performWithDbLatency(Duration.ofMillis(1000), () -> {
var createEntityIdempotencyKeys = entityKafkaSteps.sendCreateRequestToAllTopicsWithIdempotency(
getCreateEntityRequest("entity-owner-a"));
retryEntityKafkaSteps.consumeRetryCreateRequestsByIdempotencyKeys(createEntityIdempotencyKeys);
});
}
我的嵌入式 kafka 容器配置如下
embedded:
kafka:
topicsToCreate: main-topic-1, main-topic-2
我了解 kafka 的默认行为是允许自动创建主题。 该测试成功通过,但据我了解,autoCreateTopics 属性 应该阻止重试和 dlt 主题创建,即使它在 kafka 代理中是允许的 -> 并且测试必须失败,因为代理中没有重试和 dlt 主题。
请说明这是不是预期的行为,或者我是否遗漏了什么? 如果使用 doNotAutoCreateRetryTopics() 方法通过 RetryTopicConfiguration bean 声明重试功能,则会出现相同的行为。
非常感谢。
已编辑: 我主要担心的是:
- 我禁止 spring-kafka 为我的消费者创建 main-topic-1 和 main-topic-2 的重试和 dlt 主题 使用
autoCreateTopics = "false"
. - 我创建 ONLY main-topic-1 和 main-topic-2我的嵌入式 kafka 容器使用
embedded.kafka.topicsToCreate
属性。 (Kafka 代理默认 属性 是allow.auto.create.topics = true
) - 在运行时我可以看到 main-topic-1-retry, main-topic-1-dlt, main-topic-2-retry 和 main-topic-2-dlt 仍然创建。
No autoCreateTopics = "false"
仅表示框架不会创建主题;它与代理是否配置为创建它们无关。
@EmbeddedKafka(topics = ...)
将始终创建主题。
要在嵌入式 kafka 代理中禁用主题创建,请使用
@EmbeddedKafka(brokerProperties = "auto.create.topics.enable:false")
并且不要在此处指定任何 topics
。
或者你可以将消费者属性allow.auto.create.topics
设置为false
;它会覆盖代理 属性(如果为真)。
https://kafka.apache.org/documentation/#consumerconfigs_allow.auto.create.topics
@Gary Russel 非常感谢您的耐心等待 - 我终于明白了根本原因。主题不是由 spring-kafka 创建的,而是由在我的测试中连接到重试和 dlt 主题的消费者创建的 -> 代理允许它这样做 -> 主题是自动创建的。
感谢您的帮助!