spring 云流 kafka binder enableDlq 没有像我预期的那样工作
spring cloud stream kafka binder enableDlq is not working as I expected
我正在使用 spring-cloud-stream kafka binder 和架构注册表。(不是 kakfa 流)
我要做的是当不可反序列化的消息进入输入主题时,将不可反序列化的消息发送到 dlq。
所以我尝试了如下配置,但是 spring 云流应用程序保持 无限重试 并说
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
kafka:
binder:
brokers: localhost:9092
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
autoCommitOnError: true
autoCommitOffset: true
default:
consumer:
configuration:
schema.registry.url: http://localhost:8081
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
我做错了什么?请帮忙。
请不要在多个地方问同一个问题,这是浪费你我的时间;正如我已经在 Gitter 上回答的那样:
This error occurs too far down the stack for spring-cloud-stream to help with it. You need to use a ListenerContainerCustomizer
@Bean
to configure a SeekToCurrentErrorHandler
with a DeadLetterPublishingRecoverer
and configure an ErrorHandlingDeserializer
.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer
https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
也就是说,Stack Overflow 是解决此类问题的更好媒介。
我正在使用 spring-cloud-stream kafka binder 和架构注册表。(不是 kakfa 流) 我要做的是当不可反序列化的消息进入输入主题时,将不可反序列化的消息发送到 dlq。
所以我尝试了如下配置,但是 spring 云流应用程序保持 无限重试 并说
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
kafka:
binder:
brokers: localhost:9092
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
autoCommitOnError: true
autoCommitOffset: true
default:
consumer:
configuration:
schema.registry.url: http://localhost:8081
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
我做错了什么?请帮忙。
请不要在多个地方问同一个问题,这是浪费你我的时间;正如我已经在 Gitter 上回答的那样:
This error occurs too far down the stack for spring-cloud-stream to help with it. You need to use a
ListenerContainerCustomizer
@Bean
to configure aSeekToCurrentErrorHandler
with aDeadLetterPublishingRecoverer
and configure anErrorHandlingDeserializer
.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer
https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
也就是说,Stack Overflow 是解决此类问题的更好媒介。