使用 Spring Cloud Stream 生成多个主题时的 DLQ、有界重试和 EOS

DLQ, bounded retry, and EOS when producing to multiple topics using Spring Cloud Stream

我正在尝试编写一个转换函数,它将使用来自一个主题的输入并为主题 Left 和 Right 产生两个输出。此外,我需要在事务中进行此操作,以便如果应用程序未能向 Right 生成消息,它将回滚到 Left 的尝试。此外,Kafka 应该在发生故障时重新尝试交付给转换的消费者,以便应用程序有多次机会从瞬态错误中恢复。但是,如果错误是长期错误,我需要应用程序在尝试一些次数(比如说 3 次)后放弃,此时应该将消息传送到 DLQ。

我了解如何使用命令式模型实现有界重试、DLQ 和事务。也就是说,我可以使用 Function<IN, OUT> 并获得 90% 的解决方案。但是,据我所知,目前 Function<IN, Tuple2<OUT1, OUT2>> 不是受支持的签名,我必须使用反应式编程模型来传递到多个主题。此外,据我所知,自动 DLQ 不是响应式消费者框架的一部分。它看起来也不像以相同的方式管理事务,并且消费者偏移量提交似乎是自动的,而不是以流处理的成功或失败为条件。

有人可以建议我如何编写消费者:

不需要回答我的问题。

以下是我到目前为止尝试过的,改编自我最初满足其他要求的单输出命令式函数。它旨在一次从输入主题中消耗一个整数,在事务中创建一个 audit_log 数据库记录(如果失败,回滚所有内容并重试),将数字转换为两个 json标记为“左”或“右”的有效负载,然后将 json 发布到签名的输出 0 和 1。我试图将输出 3 用于 DLQ,因为来自反应流的错误不会像命令式模型那样自动进行 DLQ。这是探索性代码,只是为了学习诀窍,因此有随机机会抛出 RTE 以演练失败场景。

如果出现异常,则当前消息丢失,无需重试。如果应用程序发布了两条消息之一,而后者被异常中断,则前者仍会提交(似乎没有发生事务回滚)。最后,因为所有的错误都被删除了,所以它们永远不会到达我的代码以尝试将它们传递给 DLQ。

@Bean
@Transactional
public Function<Flux<Message<Integer>>, Tuple3<Flux<String>, Flux<String>, Flux<Object>>>
    numberToJson(AuditLogRepository repository) {
  var random = new Random();

  // I tried to create two sinks, one for each output topic. This works fine.
  var left = Sinks.many().unicast().<Integer>onBackpressureBuffer();
  var right = Sinks.many().unicast().<Integer>onBackpressureBuffer();

  // I tried to use this sink for the DLQ destination. I was hoping to manually shuffle UE's to the DLQ.
  var dlq = Sinks.many().unicast().onBackpressureBuffer();

  return flux -> {
    // Do a database operation (in a transaction).
    var persistent =
        flux.map(
                message -> {
                  var n = message.getPayload();
                  repository.createIfNotExists("Transformed n=" + n);
                  if (random.nextDouble() < 0.3) {
                    LOGGER.error("Transformer failure on n=" + n);
                    throw new RuntimeException("Transformer failure on n=" + n);
                  }
                  return message;
                })
            .doOnNext(
                message -> {
                  var n = message.getPayload();
                  left.tryEmitNext(n).orThrow();

                  // If only one side fails to publish, we want the txn to roll back.
                  if (random.nextDouble() < 0.1) {
                    LOGGER.error("Failed to publish right-side JSON: n=" + n);
                    throw new RuntimeException("Failed to publish right-side JSON: n=" + n);
                  }
                  right.tryEmitNext(n).orThrow();
                })
            .retry(3) // Make 3 attempts overall to process and publish. If that fails, continue
                      // to the DLQ.
            .onErrorContinue(
                (error, message) -> {
                  dlq.tryEmitNext(message).orThrowWithCause(error);
                })
            .retry() // If DLQ fails and this flux crashes, always restart it. The failed message
                     // will be redelivered.
            .publish()
            .autoConnect(3);

    // Split the "persistent" flux into three, which map to separate kafka topics and DLQ.
    return Tuples.of(
        left.asFlux()
            .doOnSubscribe(_sub -> persistent.subscribe())
            .map(n -> toJson(n, "left"))
            .retry(),
        right
            .asFlux()
            .doOnSubscribe(_sub -> persistent.subscribe())
            .map(n -> toJson(n, "right"))
            .retry(),
        dlq.asFlux().doOnSubscribe(_sub -> persistent.subscribe()).retry());
  };
}

最后,这是我的 application.yml 的相关部分。我有一些命令式尝试遗留下来的配置,例如 Kafka DLQ 设置。我省略了流中较早或较晚的发布者和消费者,因为我知道它们工作正常。

spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: 'tx-'
            producer:
              configuration:
                retries: 3
                acks: all
        bindings:
          numberToJson-in-0:
            consumer:
              enableDlq: true
          numberToJson-out-0:
            producer:
              topic.properties:
                retention.bytes: 10000
      bindings:
        numberToJson-in-0:
          destination: tx-number
          group: numberToJson
          consumer:
            # Not sure how this interacts with the flux retries, if at all.
            maxAttempts: 2
            properties:
              isolation.level: read_committed
        numberToJson-out-0:
          destination: tx-json-left
          producer:
            partitionCount: 3
        numberToJson-out-1:
          destination: tx-json-right
        numberToJson-out-2:
          # Manually wiring the function's 3rd output to a DLQ.
          destination: error.tx-number.numberToJson
    function:
      definition: numberToJson

[编辑] 这是我曾尝试配备 Tuple2 签名的命令式消费者,否则它可以工作:

@Bean
@Transactional
public Function<Integer, Tuple2<String, String>> numberToJson(
    AuditLogRepository repository) {
  var random = new Random();

  return n -> {
    LOGGER.info("Transforming n=" + n);
    var left = "{ \"n\": \"" + n + "\", \"side\": \"left\" }";
    var right = "{ \"n\": \"" + n + "\", \"side\": \"right\" }";

    repository.createIfNotExists("Transformed n=" + n);

    if (random.nextDouble() < 0.3) {
      LOGGER.error("Transformer failure on n=" + n);
      throw new RuntimeException("Transformer failure on n=" + n);
    }

    return Tuples.of(left, right);
  };
}

但是,这在运行时会遇到以下异常:

Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.UnsupportedOperationException: At the moment only Tuple-based function are supporting multiple arguments

谢谢!

使用 StreamBridge 并添加已配置的 AfterRollbackProcessor

以下示例包括 leftrightinput.DLT 上的消费者。

@SpringBootApplication(proxyBeanMethods = false)
public class So68928091Application {

    @Autowired
    StreamBridge bridge;

    public static void main(String[] args) {
        SpringApplication.run(So68928091Application.class, args);
    }

    @Bean
    Consumer<String> input() {
        return str -> {
            System.out.println(str);
            this.bridge.send("left", str.toUpperCase());
            this.bridge.send("right", str.toLowerCase());
            if (str.equals("Fail")) {
                throw new RuntimeException("test");
            }
        };
    }

    @SuppressWarnings({ "rawtypes", "unchecked" })
    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
        return (container, dest, group) -> {
            ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                    MessageChannel.class)).getTransactionalProducerFactory();
            KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
            DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
            container.setAfterRollbackProcessor(rollbackProcessor);
        };
    }

    DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
        return new DefaultAfterRollbackProcessor<>(
                new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("input.DLT").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicLeft() {
        return TopicBuilder.name("left").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicRight() {
        return TopicBuilder.name("right").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "left", topics = "left")
    public void listenLeft(String in) {
        System.out.println("left:" + in);
    }

    @KafkaListener(id = "right", topics = "right")
    public void listenRight(String in) {
        System.out.println("right:" + in);
    }

    @KafkaListener(id = "dlt", topics = "input.DLT")
    public void listenDlt(String in) {
        System.out.println("dlt:" + in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            System.in.read();
            template.send("input", "Fail".getBytes());
            template.send("input", "Good".getBytes());
        };
    }

}
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

#For @KafkaListeners

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed

Fail
...
Fail
...
Fail
...
Good
dlt:Fail
right:good
left:GOOD

删除绑定中的所有 DLT 设置,并将 maxAttempts 设置为 1 以禁用那里的重试。