Spring Cloud Stream 中的批处理模式和自定义错误处理
Batch Mode and Custom Error Handling in Spring Cloud Stream
在开始使用 Spring Cloud Stream 之前,我使用的是 Spring-Kafka 及其对批量消费和自定义错误处理的支持。请注意此代码段的最后两行:
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(this.consumerConfigurationProperties.getConsumer().get(TOPIC_AG_TASK_EMPP).getConcurrency());
factory.setConsumerFactory(consumerFactory);
factory.setMessageConverter(avroMessageConverter);
factory.getContainerProperties().setPollTimeout(this.consumerConfigurationProperties.getConsumer().get(TOPIC_AG_TASK_EMPP).getPollTimeout());
factory.getContainerProperties().setPauseEnabled(true);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
factory.getContainerProperties().setErrorHandler(dlqAGTaskErrorHandler);
但是,对于 Spring Cloud Stream,我找不到如何配置它。我只能找到这些配置属性:
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset, enableDlq
因此,是否可以(是否可以)在 Spring Cloud Stream 中注册自定义错误处理程序并将 AckMode 设置为 BATCH?
感谢您的支持。
目前我们在 Spring 云流级别尚不支持这些选项。以下问题应在实施后提供等效选项(可能尽快在 Chelsea.RC1 中):
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/70
https://github.com/spring-cloud/spring-cloud-stream/issues/538
在开始使用 Spring Cloud Stream 之前,我使用的是 Spring-Kafka 及其对批量消费和自定义错误处理的支持。请注意此代码段的最后两行:
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(this.consumerConfigurationProperties.getConsumer().get(TOPIC_AG_TASK_EMPP).getConcurrency());
factory.setConsumerFactory(consumerFactory);
factory.setMessageConverter(avroMessageConverter);
factory.getContainerProperties().setPollTimeout(this.consumerConfigurationProperties.getConsumer().get(TOPIC_AG_TASK_EMPP).getPollTimeout());
factory.getContainerProperties().setPauseEnabled(true);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
factory.getContainerProperties().setErrorHandler(dlqAGTaskErrorHandler);
但是,对于 Spring Cloud Stream,我找不到如何配置它。我只能找到这些配置属性:
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset, enableDlq
因此,是否可以(是否可以)在 Spring Cloud Stream 中注册自定义错误处理程序并将 AckMode 设置为 BATCH?
感谢您的支持。
目前我们在 Spring 云流级别尚不支持这些选项。以下问题应在实施后提供等效选项(可能尽快在 Chelsea.RC1 中):
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/70
https://github.com/spring-cloud/spring-cloud-stream/issues/538