交易失败时无法发布到死信主题
Cannot publish to dead letter topic on transaction failed
在我的应用程序中,我将 Kafka 设置如下。
@Configuration
public class KafkaConfiguration {
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> consumerFactory,
KafkaTransactionManager<?, ?> kafkaTransactionManager,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory);
// Send all failed message from topic XYZ to another dead-letter topic with .DLT suffix (i.e. XYZ.DLT)
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new DeadLetterPublishingRecoverer(template), new FixedBackOff(50, 1)));
return factory;
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
}
下面是我在 application.yml
spring:
kafka:
admin:
fail-fast: true
consumer:
bootstrap-servers: 127.0.0.1:9092
group-id: wfo
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
isolation.level: read_committed
producer:
bootstrap-servers: 127.0.0.1:9092
transaction-id-prefix: tx.
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
为了测试事务失败时的侦听器,我创建了以下 class。
@Service
public class EventListener {
@KafkaListener(topics="test_topic")
public void listen(TestEvent event) {
System.out.println("RECEIVED EVENT: " + event.getPayload());
if (event.getPayload().contains("fail"))
throw new RuntimeException("TEST TRANSACTION FAILED");
}
}
当我发布 TestEvent
时,我可以看到控制台上打印的负载。当我在我的负载中包含单词 fail
时,抛出 RuntimeException
并且我在控制台上看到 Transaction rolled back
错误消息。
然而,重试失败约一分钟后,我在控制台上看到以下异常。
2020-06-20 17:07:46,326 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.kafka.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{"customerCode":"DVTPRDFT411","payload":"MSGfail 90"}' to topic test_topic.DLT and partition 2:
org.apache.kafka.common.errors.TimeoutException: Topic test_topic.DLT not present in metadata after 60000 ms.
2020-06-20 17:07:46,327 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.kafka.listener.DeadLetterPublishingRecoverer : Dead-letter publication failed for: ProducerRecord(topic=test_topic.DLT, partition=2, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 102, 116, 46, 100, 101, 109, 111, 46, 100, 116, 111, 46, 100, 101, 98, 117, 103, 46, 75, 97, 102, 107, 97, 69, 118, 101, 110, 116, 82, 101, 113, 117, 101, 115, 116, 36, 71, 101, 110, 101, 114, 105, 99, 69, 118, 101, 110, 116]), RecordHeader(key = kafka_dlt-original-topic, value = [116, 101, 115, 116, 95, 116, 111, 112, 105, 99]), RecordHeader(key = kafka_dlt-original-partition, value = [0, 0, 0, 2]), RecordHeader(key = kafka_dlt-original-offset, value = [0, 0, 0, 0, 0, 0, 0, 0]), RecordHeader(key = kafka_dlt-original-timestamp, value = [0, 0, 1, 114, -48, -29, -90, 47]), RecordHeader(key = kafka_dlt-original-timestamp-type, value = [67, 114, 101, 97, 116, 101, 84, 105, 109, 101]), RecordHeader(key = kafka_dlt-exception-fqcn, value = [111, 114, 103, 46, 115, 112, 114, 105, 110, 103, 102, 114, 97, 109, 101, 119, 111, 114, 107, 46, 107, 97, 102, 107, 97, 46, 108, 105, 115, 116, 101, 110, 101, 114, 46, 76, 105, 115, 116, 101, 110, 101, 114, 69, 120, 101, 99, 117, 116, 105, 111, 110, 70, 97, 105, 108, 101, 100, 69, 120, 99, 101, 112, 116, 105, 111, 110]), RecordHeader(key = kafka_dlt-exception-message, value = [76, 105, 115, 116, 101, 110, 101, 114, 32, 109, 101, 116, 104, 111, 100, 32, 39, 112, 117, 98, 108, 105, 99, 32, 118, 111, 105, 100, 32, 99, 111, 109, 46, 102, 116, 46, 100, 101, 109, 111, 46, 115, 101, 114, 118, 105, 99, 101, 46, 69, 118, 101, 110, 116, 76, 105, 115, 116, 101, 110, 101, 114, 46, 108, 105, 115, 116, 101, 110, 40, 99, 111, 109, 46, 102, 116, 46, 101, 118, 101, 110, 116, 46, 109, 97, 110, 97, 103, 101, 109, 101, 110, 116, 46, 100, 101, 102, 105, 110, 105, 116, 105, 111, 110, 46, 84, 101, 115, 116, 69, 118, 101, 110, 116, 41, 39, 32, 116, 104, 114, 101, 119, 32, 101, 120, 99, 101, 112, 116, 105, 111, 110, 59, 32, 110, 101, 115, 116, 101, 100, 32, 101, 120, 99, 101, 112, 116, 105, 111, 110, 32, 105, 115, 32, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 82, 117, 110, 116, 105, 109, 101, 69, 120, 99, 101, 112, 116, 105, 111, 110, 58, 32, 84, 69, 83, 84, 32, 84, 82, 65, 78, 83, 65, 67, 84, 73, 79, 78, 32, 70, 65, 73, 76, 69, 68, 59, 32, 110, 101, 115, 116, 101, 100, 32, 101, 120, 99, 101, 112, 116, 105, 111, 110, 32, 105, 115, 32, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 82, 117, 110, 116, 105, 109, 101, 69, 120, 99, 101, 112, 116, 105, 111, 110, 58, 32, 84, 69, 83, 84, 32, 84, 82, 65, 78, 83, 65, 67, 84, 73, 79, 78, 32, 70, 65, 73, 76, 69, 68]), RecordHeader(key = kafka_dlt-exception-stacktrace, value = [111, 100]), RecordHeader(key = b3, value = [100, 57, 51, 100, 56, 97, 98, 57, 55, 54, 97, 100, 54, 102, 49, 100, 45, 100, 57, 51, 100, 56, 97, 98, 57, 55, 54, 97, 100, 54, 102, 49, 100, 45, 49])], isReadOnly = false), key=null, value={"customerCode":"DVTPRDFT411","payload":"MSGfail 90"}, timestamp=null)
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test_topic.DLT not present in metadata after 60000 ms.
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:385)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:278)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.lambda$accept(DeadLetterPublishingRecoverer.java:209)
at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:463)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:208)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54)
at org.springframework.kafka.listener.FailedRecordTracker.skip(FailedRecordTracker.java:106)
at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks(SeekUtils.java:84)
at java.util.ArrayList.forEach(Unknown Source)
at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:81)
at org.springframework.kafka.listener.DefaultAfterRollbackProcessor.process(DefaultAfterRollbackProcessor.java:102)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.recordAfterRollback(KafkaMessageListenerContainer.java:1700)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1662)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1614)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
由于某些原因,代码无法将事件发布到 DLT 主题。我试图编辑代理配置以包含 listeners=PLAINTEXT://:9092
但它没有帮助。
如果您能指出解决此问题的方向,我将不胜感激。
**更新:
当我执行 kafka-topics --list --zookeeper localhost:2181
命令时,我确实看到了 DLT 主题。
顺便说一下,这可能是最新 Spring Kafka 版本的一个错误。我降级到 Kafka v2.4.1 和 Spring Boot v2.2.7,它工作正常。我在这里报告了错误。
https://github.com/spring-projects/spring-kafka/issues/1516
如果要在 Kafka v2.5.0 中完成一些新配置才能使这项工作正常进行,请告诉我。
org.apache.kafka.common.errors.TimeoutException: Topic test_topic.DLT not present in metadata after 60000 ms.
从错误来看,你似乎没有 test_topic.DLT
.
对于您创建的每个主题,例如 XXX
,您需要创建其对应的 XXX.DLT
个主题。
作为旁注,ErrorHandlingDeserializer2
已弃用,因为 2.5
.
更新:
当您列出主题 kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --list
时,您能看到 test_topic.DLT
吗?当你描述它时kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --describe --topic test_topic.DLT
,你能看到与原始主题相同数量的分区,更准确地说你能看到分区2吗?
根据 Spring Kafka Documentation:
By default, the dead-letter record is sent to a topic named
.DLT (the original topic name suffixed with .DLT) and
to the same partition as the original record.
在我的应用程序中,我将 Kafka 设置如下。
@Configuration
public class KafkaConfiguration {
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> consumerFactory,
KafkaTransactionManager<?, ?> kafkaTransactionManager,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory);
// Send all failed message from topic XYZ to another dead-letter topic with .DLT suffix (i.e. XYZ.DLT)
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new DeadLetterPublishingRecoverer(template), new FixedBackOff(50, 1)));
return factory;
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
}
下面是我在 application.yml
spring:
kafka:
admin:
fail-fast: true
consumer:
bootstrap-servers: 127.0.0.1:9092
group-id: wfo
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
isolation.level: read_committed
producer:
bootstrap-servers: 127.0.0.1:9092
transaction-id-prefix: tx.
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
为了测试事务失败时的侦听器,我创建了以下 class。
@Service
public class EventListener {
@KafkaListener(topics="test_topic")
public void listen(TestEvent event) {
System.out.println("RECEIVED EVENT: " + event.getPayload());
if (event.getPayload().contains("fail"))
throw new RuntimeException("TEST TRANSACTION FAILED");
}
}
当我发布 TestEvent
时,我可以看到控制台上打印的负载。当我在我的负载中包含单词 fail
时,抛出 RuntimeException
并且我在控制台上看到 Transaction rolled back
错误消息。
然而,重试失败约一分钟后,我在控制台上看到以下异常。
2020-06-20 17:07:46,326 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.kafka.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{"customerCode":"DVTPRDFT411","payload":"MSGfail 90"}' to topic test_topic.DLT and partition 2:
org.apache.kafka.common.errors.TimeoutException: Topic test_topic.DLT not present in metadata after 60000 ms.
2020-06-20 17:07:46,327 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.kafka.listener.DeadLetterPublishingRecoverer : Dead-letter publication failed for: ProducerRecord(topic=test_topic.DLT, partition=2, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 102, 116, 46, 100, 101, 109, 111, 46, 100, 116, 111, 46, 100, 101, 98, 117, 103, 46, 75, 97, 102, 107, 97, 69, 118, 101, 110, 116, 82, 101, 113, 117, 101, 115, 116, 36, 71, 101, 110, 101, 114, 105, 99, 69, 118, 101, 110, 116]), RecordHeader(key = kafka_dlt-original-topic, value = [116, 101, 115, 116, 95, 116, 111, 112, 105, 99]), RecordHeader(key = kafka_dlt-original-partition, value = [0, 0, 0, 2]), RecordHeader(key = kafka_dlt-original-offset, value = [0, 0, 0, 0, 0, 0, 0, 0]), RecordHeader(key = kafka_dlt-original-timestamp, value = [0, 0, 1, 114, -48, -29, -90, 47]), RecordHeader(key = kafka_dlt-original-timestamp-type, value = [67, 114, 101, 97, 116, 101, 84, 105, 109, 101]), RecordHeader(key = kafka_dlt-exception-fqcn, value = [111, 114, 103, 46, 115, 112, 114, 105, 110, 103, 102, 114, 97, 109, 101, 119, 111, 114, 107, 46, 107, 97, 102, 107, 97, 46, 108, 105, 115, 116, 101, 110, 101, 114, 46, 76, 105, 115, 116, 101, 110, 101, 114, 69, 120, 101, 99, 117, 116, 105, 111, 110, 70, 97, 105, 108, 101, 100, 69, 120, 99, 101, 112, 116, 105, 111, 110]), RecordHeader(key = kafka_dlt-exception-message, value = [76, 105, 115, 116, 101, 110, 101, 114, 32, 109, 101, 116, 104, 111, 100, 32, 39, 112, 117, 98, 108, 105, 99, 32, 118, 111, 105, 100, 32, 99, 111, 109, 46, 102, 116, 46, 100, 101, 109, 111, 46, 115, 101, 114, 118, 105, 99, 101, 46, 69, 118, 101, 110, 116, 76, 105, 115, 116, 101, 110, 101, 114, 46, 108, 105, 115, 116, 101, 110, 40, 99, 111, 109, 46, 102, 116, 46, 101, 118, 101, 110, 116, 46, 109, 97, 110, 97, 103, 101, 109, 101, 110, 116, 46, 100, 101, 102, 105, 110, 105, 116, 105, 111, 110, 46, 84, 101, 115, 116, 69, 118, 101, 110, 116, 41, 39, 32, 116, 104, 114, 101, 119, 32, 101, 120, 99, 101, 112, 116, 105, 111, 110, 59, 32, 110, 101, 115, 116, 101, 100, 32, 101, 120, 99, 101, 112, 116, 105, 111, 110, 32, 105, 115, 32, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 82, 117, 110, 116, 105, 109, 101, 69, 120, 99, 101, 112, 116, 105, 111, 110, 58, 32, 84, 69, 83, 84, 32, 84, 82, 65, 78, 83, 65, 67, 84, 73, 79, 78, 32, 70, 65, 73, 76, 69, 68, 59, 32, 110, 101, 115, 116, 101, 100, 32, 101, 120, 99, 101, 112, 116, 105, 111, 110, 32, 105, 115, 32, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 82, 117, 110, 116, 105, 109, 101, 69, 120, 99, 101, 112, 116, 105, 111, 110, 58, 32, 84, 69, 83, 84, 32, 84, 82, 65, 78, 83, 65, 67, 84, 73, 79, 78, 32, 70, 65, 73, 76, 69, 68]), RecordHeader(key = kafka_dlt-exception-stacktrace, value = [111, 100]), RecordHeader(key = b3, value = [100, 57, 51, 100, 56, 97, 98, 57, 55, 54, 97, 100, 54, 102, 49, 100, 45, 100, 57, 51, 100, 56, 97, 98, 57, 55, 54, 97, 100, 54, 102, 49, 100, 45, 49])], isReadOnly = false), key=null, value={"customerCode":"DVTPRDFT411","payload":"MSGfail 90"}, timestamp=null)
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test_topic.DLT not present in metadata after 60000 ms.
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:385)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:278)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.lambda$accept(DeadLetterPublishingRecoverer.java:209)
at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:463)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:208)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54)
at org.springframework.kafka.listener.FailedRecordTracker.skip(FailedRecordTracker.java:106)
at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks(SeekUtils.java:84)
at java.util.ArrayList.forEach(Unknown Source)
at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:81)
at org.springframework.kafka.listener.DefaultAfterRollbackProcessor.process(DefaultAfterRollbackProcessor.java:102)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.recordAfterRollback(KafkaMessageListenerContainer.java:1700)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1662)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1614)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
由于某些原因,代码无法将事件发布到 DLT 主题。我试图编辑代理配置以包含 listeners=PLAINTEXT://:9092
但它没有帮助。
如果您能指出解决此问题的方向,我将不胜感激。
**更新:
当我执行 kafka-topics --list --zookeeper localhost:2181
命令时,我确实看到了 DLT 主题。
顺便说一下,这可能是最新 Spring Kafka 版本的一个错误。我降级到 Kafka v2.4.1 和 Spring Boot v2.2.7,它工作正常。我在这里报告了错误。
https://github.com/spring-projects/spring-kafka/issues/1516
如果要在 Kafka v2.5.0 中完成一些新配置才能使这项工作正常进行,请告诉我。
org.apache.kafka.common.errors.TimeoutException: Topic test_topic.DLT not present in metadata after 60000 ms.
从错误来看,你似乎没有 test_topic.DLT
.
对于您创建的每个主题,例如 XXX
,您需要创建其对应的 XXX.DLT
个主题。
作为旁注,ErrorHandlingDeserializer2
已弃用,因为 2.5
.
更新:
当您列出主题 kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --list
时,您能看到 test_topic.DLT
吗?当你描述它时kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --describe --topic test_topic.DLT
,你能看到与原始主题相同数量的分区,更准确地说你能看到分区2吗?
根据 Spring Kafka Documentation:
By default, the dead-letter record is sent to a topic named .DLT (the original topic name suffixed with .DLT) and to the same partition as the original record.