事务性 kafka 侦听器重试

Transactional kafka listener retry

我正在尝试创建一个 Spring Kafka @KafkaListener,它既是事务性的(kafa 和数据库)又使用重试。我正在使用 Spring 启动。错误处理程序的文档说

When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction. Error handling for transactional containers are handled by the AfterRollbackProcessor. If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back (source).

然而,当我用 @Transactional("kafkaTransactionManager) 注释配置我的侦听器时,即使我可以清楚地看到模板在引发异常时回滚生成的消息,容器实际上使用了一个非空 commonErrorHandler 而不是 AfterRollbackProcessor。即使我在容器工厂中将 commonErrorHandler 显式配置为 null 也是如此。我没有看到任何证据表明我配置的 AfterRollbackProcessor 曾经被调用过,即使在 commonErrorHandler 耗尽其重试策略之后也是如此。

我不确定 Spring Kafka 的错误处理在这一点上一般是如何工作的,我正在寻求澄清。我要回答的问题是:

谢谢!

我的代码:

@Configuration
@EnableScheduling
@EnableKafka
public class KafkaConfiguration {

  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfiguration.class);

  @Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
      ConsumerFactory<Object, Object> consumerFactory) {
    var factory = new ConcurrentKafkaListenerContainerFactory<Integer, Object>();
    factory.setConsumerFactory(consumerFactory);

    var afterRollbackProcessor =
        new DefaultAfterRollbackProcessor<Object, Object>(
            (record, e) -> LOGGER.info("After rollback processor triggered! {}", e.getMessage()),
            new FixedBackOff(1_000, 1));

    // Configures different error handling for different listeners.
    factory.setContainerCustomizer(
        container -> {
          var groupId = container.getContainerProperties().getGroupId();
          if (groupId.equals("InputProcessorHigh") || groupId.equals("InputProcessorLow")) {
            container.setAfterRollbackProcessor(afterRollbackProcessor);
            // If I set commonErrorHandler to null, it is defaulted instead.
          }
        });
    return factory;
  }
}
@Component
public class InputProcessor {

  private static final Logger LOGGER = LoggerFactory.getLogger(InputProcessor.class);

  private final KafkaTemplate<Integer, Object> template;
  private final AuditLogRepository repository;

  @Autowired
  public InputProcessor(KafkaTemplate<Integer, Object> template, AuditLogRepository repository) {
    this.template = template;
    this.repository = repository;
  }

  @KafkaListener(id = "InputProcessorHigh", topics = "input-high", concurrency = "3")
  @Transactional("kafkaTransactionManager")
  public void inputHighProcessor(ConsumerRecord<Integer, Input> input) {
    processInputs(input);
  }

  @KafkaListener(id = "InputProcessorLow", topics = "input-low", concurrency = "1")
  @Transactional("kafkaTransactionManager")
  public void inputLowProcessor(ConsumerRecord<Integer, Input> input) {
    processInputs(input);
  }

  public void processInputs(ConsumerRecord<Integer, Input> input) {
    var key = input.key();
    var message = input.value().getMessage();
    var output = new Output().setMessage(message);

    LOGGER.info("Processing {}", message);
    template.send("output-left", key, output);
    repository.createIfNotExists(message); // idempotent insert
    template.send("output-right", key, output);

    if (message.contains("ERROR")) {
      throw new RuntimeException("Simulated processing error!");
    }
  }
}

我的application.yaml(减去我的bootstrap-服务器和安全配置):

spring:
  kafka:
    consumer:
      auto-offset-reset: 'earliest'
      key-deserializer: 'org.apache.kafka.common.serialization.IntegerDeserializer'
      value-deserializer: 'org.springframework.kafka.support.serializer.JsonDeserializer'
      isolation-level: 'read_committed'
      properties:
        spring.json.trusted.packages: 'java.util,java.lang,com.github.tomboyo.silverbroccoli.*'
    producer:
      transaction-id-prefix: 'tx-'
      key-serializer: 'org.apache.kafka.common.serialization.IntegerSerializer'
      value-serializer: 'org.springframework.kafka.support.serializer.JsonSerializer'

[编辑](解决方案代码)

在 Gary 的帮助下,我才弄明白了。正如他们所说,我们需要在容器上设置kafka事务管理器,这样容器才能启动事务。交易文档没有介绍如何执行此操作,但有几种方法。首先,我们可以从工厂获取可变容器属性对象,并在 that:

上设置事务管理器
@Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
    var factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setTransactionManager(...);
    return factory;
}

如果我们在 Spring 启动,我们可以重新使用一些自动配置来在我们自定义之前在我们的工厂设置合理的默认值。我们可以看到 KafkaAutoConfiguration 模块导入了 KafkaAnnotationDrivenConfiguration,它产生了一个 ConcurrentKafkaListenerContainerFactoryConfigurer bean。这似乎是 Spring-Boot 应用程序中所有默认配置的原因。因此,我们可以注入那个 bean 并在添加定制之前用它来初始化我们的工厂:

@Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer bootConfigurer,
      ConsumerFactory<Object, Object> consumerFactory) {
    var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();

    // Apply default spring-boot configuration.
    bootConfigurer.configure(factory, consumerFactory);

    factory.setContainerCustomizer(
        container -> {
          ... // do whatever
        });
    return factory;
  }

完成后,容器将按预期使用 AfterRollbackProcessor 进行错误处理。只要我不显式配置通用错误处理程序,这似乎是唯一的异常处理层。

AfterRollbackProcessor仅在容器知道事务时使用;您必须向容器提供 KafkaTransactionManager 以便容器启动 kafka 事务,并将偏移量发送到事务。使用 @Transactional 不是启动 Kafka 事务的正确方法。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions