如何使用 spring 开始创建 Kafka 消费者侦听器,如果消息被拒绝,则在可变时间后重试使用它们
How to create a Kafka consumer listener with a spring start that, in case of rejection of the message, retry to consume them after a variable time
我在 springboot 应用程序中有一个简单的 kafka 消费者监听器,如下所示:
@KafkaListener(topics="mytopic")
public void receive(String message) {
LOGGER.info("received message='{}'", messge);
}
在某些特定情况下我想拒绝该消息,
但我希望系统在一定时间后再次向我提出建议;
我该怎么办?
注意:我也希望kafka的配置是定制的(不是默认的springboot结构)
见retrying deliveries and Stateful Retry。
使用您想要的重试特性配置侦听器工厂并(可选)添加 SeekToCurrentErrorHandler
.
我的实现正是您需要的:
1) kafka 配置 class 从自定义 属性 中获取字段并重试
5000 毫秒后拒绝消息(在 kafkaListenerContainerFactory 方法内):
@Configuration
public class KafkaConfig {
//...
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
if(enableSsl) {
//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslPassword);
// configure the following three settings for SSL Authentication
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslPassword);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslPassword);
}
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
RetryTemplate retryTemplate = new RetryTemplate();
factory.setStatefulRetry(false);
factory.setRetryTemplate(retryTemplate);
//infinite number of retry attempts
retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
//wait a "waitingTime" time before retrying
int waitingTime = 5000;
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(waitingTime);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
//or use exponential waiting
//ExponentialBackOffPolicy expBackoff = new ExponentialBackOffPolicy();
//expBackoff.setInitialInterval(...);
//expBackoff.setMaxInterval(...);
//retryTemplate.setBackOffPolicy(expBackoff);
return factory;
}
}
2) class 消耗消息:
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
//...
@KafkaListener(topics="${kafka.topics.test}")
public void consume(String message, Acknowledgment ack) throws IOException {
if(processMessage) {
logger.info(String.format("##KAFKA## -> Consumed message -> %s", message));
ack.acknowledge();
}
else {
logger.error(String.format("##KAFKA## -> Failed message -> %s", message));
throw new IOException("reject message");
}
}
}
我在 springboot 应用程序中有一个简单的 kafka 消费者监听器,如下所示:
@KafkaListener(topics="mytopic")
public void receive(String message) {
LOGGER.info("received message='{}'", messge);
}
在某些特定情况下我想拒绝该消息, 但我希望系统在一定时间后再次向我提出建议;
我该怎么办?
注意:我也希望kafka的配置是定制的(不是默认的springboot结构)
见retrying deliveries and Stateful Retry。
使用您想要的重试特性配置侦听器工厂并(可选)添加 SeekToCurrentErrorHandler
.
我的实现正是您需要的:
1) kafka 配置 class 从自定义 属性 中获取字段并重试 5000 毫秒后拒绝消息(在 kafkaListenerContainerFactory 方法内):
@Configuration
public class KafkaConfig {
//...
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
if(enableSsl) {
//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslPassword);
// configure the following three settings for SSL Authentication
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslPassword);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslPassword);
}
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
RetryTemplate retryTemplate = new RetryTemplate();
factory.setStatefulRetry(false);
factory.setRetryTemplate(retryTemplate);
//infinite number of retry attempts
retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
//wait a "waitingTime" time before retrying
int waitingTime = 5000;
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(waitingTime);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
//or use exponential waiting
//ExponentialBackOffPolicy expBackoff = new ExponentialBackOffPolicy();
//expBackoff.setInitialInterval(...);
//expBackoff.setMaxInterval(...);
//retryTemplate.setBackOffPolicy(expBackoff);
return factory;
}
}
2) class 消耗消息:
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
//...
@KafkaListener(topics="${kafka.topics.test}")
public void consume(String message, Acknowledgment ack) throws IOException {
if(processMessage) {
logger.info(String.format("##KAFKA## -> Consumed message -> %s", message));
ack.acknowledge();
}
else {
logger.error(String.format("##KAFKA## -> Failed message -> %s", message));
throw new IOException("reject message");
}
}
}