RetryBackoffSpec 不适用于抛出异常的 KafkaReceiver

RetryBackoffSpec not working with KafkaReceiver which throws exception

我有一个用例,我想 无限地从 Kafka 接收 记录,并使用 processRecord(String record) 对记录进行一些处理,这可能会抛出 RuntimeException.我想重试多次(比如 5 次),如果在 5 次重试之前的任何时候都成功,我想手动提交偏移量并继续下一条记录,如果不是,那么想(记录它 --> 提交偏移量)然后继续下一个记录。我有代码,但似乎无法正常工作。非常感谢您的帮助。

public class MyClass {
    private final AtomicInteger atomicInteger = new AtomicInteger(0);
    private final ReceiverOptions<String, String> receiverOptions = getReceiverOptions();

    public void consumeRecords() {
        RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
        KafkaReceiver.create(receiverOptions)
                .receive()
                .doOnNext(record -> {
                    System.out.println(record.value());
                    processRecord(record.value());
                })
                .doOnError(e -> System.out.println(atomicInteger.incrementAndGet()))
                .onErrorContinue((e, r) -> {
                    System.out.println(atomicInteger.incrementAndGet());
                    System.out.println("Record: " + r);
                    System.out.println("Error: " + e);
                })
                .retryWhen(retrySpec)
                .repeat()
                .subscribe();

    }

    public void processRecord(String record) {
        // might throw an exception
        throw new RuntimeException("Throwing exception!");
    }
}

我收到的输出是:

some message
1
Record: ConsumerRecord(topic = my-topic, partition = 0, leaderEpoch = null, offset = 1, CreateTime = 1620062099518, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = some message)
Error: java.lang.RuntimeException: Throwing exception!

second message
1
Record: ConsumerRecord(topic = my-topic, partition = 1, leaderEpoch = null, offset = 2, CreateTime = 1620062166706, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = second message)
Error: java.lang.RuntimeException: Throwing exception!

它没有重试 5 次,而且 AtomicInteger 没有为第二条记录更新。

我想要实现的是:

count = 0
while (count < 5) {
    if (exception) count++;
    else break_and_continue_with_next_record
}

if (count == 5) log_failure_and_continue_with_next_record

onErrorResume() 优于 onErrorContinue().

那么问题是你不能在那里提交偏移量,因为接收器在那个时候不再活动。

这对我有用...

    private final AtomicInteger atomicInteger = new AtomicInteger();

    public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
        RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
        AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
        receiver.receive()
                .subscribeOn(Schedulers.single())
                .doOnNext(record -> {
                    System.out.println(record.value() + "@" + record.offset());
                    if (failed.get() != null) {
                        System.out.println("Committing failed record offset " + record.value()
                                + "@" + record.offset());
                        record.receiverOffset().acknowledge();
                        failed.set(null);
                    }
                    else {
                        atomicInteger.set(0);
                        try {
                            processRecord(record.value());
                            record.receiverOffset().acknowledge();
                        }
                        catch (Exception e) {
                            throw new ReceiverRecordException(record, e);
                        }
                    }
                })
                .doOnError(ex -> atomicInteger.incrementAndGet())
                .retryWhen(retrySpec)
                .onErrorResume(e -> {
                    ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
                    ReceiverRecord<?, ?> record = ex.getRecord();
                    System.out.println("Retries exhausted for " + record.value()
                            + "@" + record.offset());
                    failed.set(record);
                    return Mono.empty();
                })
                .repeat()
                .subscribe();
    }

    public void processRecord(String record) {
        // might throw an exception
        throw new RuntimeException("Throwing exception!");
    }

}

@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {

    private final ReceiverRecord record;

    ReceiverRecordException(ReceiverRecord record, Throwable t) {
        super(t);
        this.record = record;
    }

    public ReceiverRecord getRecord() {
        return this.record;
    }

}

编辑

这是完整的应用程序...

@SpringBootApplication
public class So67373188Application {

    private static final Logger log = LoggerFactory.getLogger(So67373188Application.class);

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(So67373188Application.class, args);
        Thread.sleep(120_000);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67373188").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner2() {
        return args -> {
            SenderOptions<String, String> so = SenderOptions.create(
                    Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
                            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
            KafkaSender<String, String> sender = KafkaSender.create(so);
            Disposable subscribed = sender.send(Flux.just(pr("foo"), pr("bar"), pr("fail"), pr("baz")))
                .subscribe(result -> {
                    System.out.println(result.recordMetadata());
                });
            Thread.sleep(5000);
            subscribed.dispose();
        };
    }

    @Bean
    public ApplicationRunner runner3(KafkaOperations<String, String> template) {
        return args -> {
            DeadLetterPublishingRecoverer dlpr = new DeadLetterPublishingRecoverer(template);
            ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
                    Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                            ConsumerConfig.GROUP_ID_CONFIG, "so67373188",
                            ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
                            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
                    .withKeyDeserializer(new StringDeserializer())
                    .withValueDeserializer(new StringDeserializer())
                    .addAssignListener(assignments -> log.info("Assigned: " + assignments))
                    .commitBatchSize(1)
                    .subscription(Collections.singletonList("so67373188"));
            consumeRecords(ro);
        };
    }

    private SenderRecord<String, String, String> pr(String value) {
        return SenderRecord.create("so67373188", 0, null, null, value, value + ".corr");
    }

    private final AtomicInteger atomicInteger = new AtomicInteger();

    public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
        RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
        AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
        receiver.receive()
                .subscribeOn(Schedulers.single())
                .doOnNext(record -> {
                    System.out.println(record.value() + "@" + record.offset());
                    if (failed.get() != null) {
                        System.out.println("Committing failed record offset " + record.value()
                                + "@" + record.offset());
                        record.receiverOffset().acknowledge();
                        failed.set(null);
                    }
                    else {
                        atomicInteger.set(0);
                        try {
                            processRecord(record.value());
                            record.receiverOffset().acknowledge();
                        }
                        catch (Exception e) {
                            throw new ReceiverRecordException(record, e);
                        }
                    }
                })
                .doOnError(ex -> atomicInteger.incrementAndGet())
                .retryWhen(retrySpec)
                .onErrorResume(e -> {
                    ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
                    ReceiverRecord<?, ?> record = ex.getRecord();
                    System.out.println("Retries exhausted for " + record.value()
                            + "@" + record.offset());
                    failed.set(record);
                    return Mono.empty();
                })
                .repeat()
                .subscribe();
    }

    public void processRecord(String record) {
        // might throw an exception
        if (record.equals("fail")) {
            throw new RuntimeException("Throwing exception!");
        }
    }

}

@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {

    private final ReceiverRecord record;

    ReceiverRecordException(ReceiverRecord record, Throwable t) {
        super(t);
        this.record = record;
    }

    public ReceiverRecord getRecord() {
        return this.record;
    }

}

结果:


  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m              [2m (v2.4.5)[0;39m

so67373188-0@16
so67373188-0@17
so67373188-0@18
so67373188-0@19
foo@16
bar@17
fail@18
fail@18
fail@18
fail@18
fail@18
fail@18
Retries exhausted for fail@18
fail@18
Committing failed record offset fail@18
baz@19