RetryBackoffSpec 不适用于抛出异常的 KafkaReceiver
RetryBackoffSpec not working with KafkaReceiver which throws exception
我有一个用例,我想 无限地从 Kafka 接收 记录,并使用 processRecord(String record)
对记录进行一些处理,这可能会抛出 RuntimeException
.我想重试多次(比如 5 次),如果在 5 次重试之前的任何时候都成功,我想手动提交偏移量并继续下一条记录,如果不是,那么想(记录它 --> 提交偏移量)然后继续下一个记录。我有代码,但似乎无法正常工作。非常感谢您的帮助。
public class MyClass {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
private final ReceiverOptions<String, String> receiverOptions = getReceiverOptions();
public void consumeRecords() {
RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext(record -> {
System.out.println(record.value());
processRecord(record.value());
})
.doOnError(e -> System.out.println(atomicInteger.incrementAndGet()))
.onErrorContinue((e, r) -> {
System.out.println(atomicInteger.incrementAndGet());
System.out.println("Record: " + r);
System.out.println("Error: " + e);
})
.retryWhen(retrySpec)
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
throw new RuntimeException("Throwing exception!");
}
}
我收到的输出是:
some message
1
Record: ConsumerRecord(topic = my-topic, partition = 0, leaderEpoch = null, offset = 1, CreateTime = 1620062099518, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = some message)
Error: java.lang.RuntimeException: Throwing exception!
second message
1
Record: ConsumerRecord(topic = my-topic, partition = 1, leaderEpoch = null, offset = 2, CreateTime = 1620062166706, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = second message)
Error: java.lang.RuntimeException: Throwing exception!
它没有重试 5 次,而且 AtomicInteger 没有为第二条记录更新。
我想要实现的是:
count = 0
while (count < 5) {
if (exception) count++;
else break_and_continue_with_next_record
}
if (count == 5) log_failure_and_continue_with_next_record
onErrorResume()
优于 onErrorContinue()
.
那么问题是你不能在那里提交偏移量,因为接收器在那个时候不再活动。
这对我有用...
private final AtomicInteger atomicInteger = new AtomicInteger();
public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
receiver.receive()
.subscribeOn(Schedulers.single())
.doOnNext(record -> {
System.out.println(record.value() + "@" + record.offset());
if (failed.get() != null) {
System.out.println("Committing failed record offset " + record.value()
+ "@" + record.offset());
record.receiverOffset().acknowledge();
failed.set(null);
}
else {
atomicInteger.set(0);
try {
processRecord(record.value());
record.receiverOffset().acknowledge();
}
catch (Exception e) {
throw new ReceiverRecordException(record, e);
}
}
})
.doOnError(ex -> atomicInteger.incrementAndGet())
.retryWhen(retrySpec)
.onErrorResume(e -> {
ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
ReceiverRecord<?, ?> record = ex.getRecord();
System.out.println("Retries exhausted for " + record.value()
+ "@" + record.offset());
failed.set(record);
return Mono.empty();
})
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
throw new RuntimeException("Throwing exception!");
}
}
@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {
private final ReceiverRecord record;
ReceiverRecordException(ReceiverRecord record, Throwable t) {
super(t);
this.record = record;
}
public ReceiverRecord getRecord() {
return this.record;
}
}
编辑
这是完整的应用程序...
@SpringBootApplication
public class So67373188Application {
private static final Logger log = LoggerFactory.getLogger(So67373188Application.class);
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(So67373188Application.class, args);
Thread.sleep(120_000);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67373188").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner2() {
return args -> {
SenderOptions<String, String> so = SenderOptions.create(
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
KafkaSender<String, String> sender = KafkaSender.create(so);
Disposable subscribed = sender.send(Flux.just(pr("foo"), pr("bar"), pr("fail"), pr("baz")))
.subscribe(result -> {
System.out.println(result.recordMetadata());
});
Thread.sleep(5000);
subscribed.dispose();
};
}
@Bean
public ApplicationRunner runner3(KafkaOperations<String, String> template) {
return args -> {
DeadLetterPublishingRecoverer dlpr = new DeadLetterPublishingRecoverer(template);
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "so67373188",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.commitBatchSize(1)
.subscription(Collections.singletonList("so67373188"));
consumeRecords(ro);
};
}
private SenderRecord<String, String, String> pr(String value) {
return SenderRecord.create("so67373188", 0, null, null, value, value + ".corr");
}
private final AtomicInteger atomicInteger = new AtomicInteger();
public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
receiver.receive()
.subscribeOn(Schedulers.single())
.doOnNext(record -> {
System.out.println(record.value() + "@" + record.offset());
if (failed.get() != null) {
System.out.println("Committing failed record offset " + record.value()
+ "@" + record.offset());
record.receiverOffset().acknowledge();
failed.set(null);
}
else {
atomicInteger.set(0);
try {
processRecord(record.value());
record.receiverOffset().acknowledge();
}
catch (Exception e) {
throw new ReceiverRecordException(record, e);
}
}
})
.doOnError(ex -> atomicInteger.incrementAndGet())
.retryWhen(retrySpec)
.onErrorResume(e -> {
ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
ReceiverRecord<?, ?> record = ex.getRecord();
System.out.println("Retries exhausted for " + record.value()
+ "@" + record.offset());
failed.set(record);
return Mono.empty();
})
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
if (record.equals("fail")) {
throw new RuntimeException("Throwing exception!");
}
}
}
@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {
private final ReceiverRecord record;
ReceiverRecordException(ReceiverRecord record, Throwable t) {
super(t);
this.record = record;
}
public ReceiverRecord getRecord() {
return this.record;
}
}
结果:
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m [2m (v2.4.5)[0;39m
so67373188-0@16
so67373188-0@17
so67373188-0@18
so67373188-0@19
foo@16
bar@17
fail@18
fail@18
fail@18
fail@18
fail@18
fail@18
Retries exhausted for fail@18
fail@18
Committing failed record offset fail@18
baz@19
我有一个用例,我想 无限地从 Kafka 接收 记录,并使用 processRecord(String record)
对记录进行一些处理,这可能会抛出 RuntimeException
.我想重试多次(比如 5 次),如果在 5 次重试之前的任何时候都成功,我想手动提交偏移量并继续下一条记录,如果不是,那么想(记录它 --> 提交偏移量)然后继续下一个记录。我有代码,但似乎无法正常工作。非常感谢您的帮助。
public class MyClass {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
private final ReceiverOptions<String, String> receiverOptions = getReceiverOptions();
public void consumeRecords() {
RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext(record -> {
System.out.println(record.value());
processRecord(record.value());
})
.doOnError(e -> System.out.println(atomicInteger.incrementAndGet()))
.onErrorContinue((e, r) -> {
System.out.println(atomicInteger.incrementAndGet());
System.out.println("Record: " + r);
System.out.println("Error: " + e);
})
.retryWhen(retrySpec)
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
throw new RuntimeException("Throwing exception!");
}
}
我收到的输出是:
some message
1
Record: ConsumerRecord(topic = my-topic, partition = 0, leaderEpoch = null, offset = 1, CreateTime = 1620062099518, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = some message)
Error: java.lang.RuntimeException: Throwing exception!
second message
1
Record: ConsumerRecord(topic = my-topic, partition = 1, leaderEpoch = null, offset = 2, CreateTime = 1620062166706, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = second message)
Error: java.lang.RuntimeException: Throwing exception!
它没有重试 5 次,而且 AtomicInteger 没有为第二条记录更新。
我想要实现的是:
count = 0
while (count < 5) {
if (exception) count++;
else break_and_continue_with_next_record
}
if (count == 5) log_failure_and_continue_with_next_record
onErrorResume()
优于 onErrorContinue()
.
那么问题是你不能在那里提交偏移量,因为接收器在那个时候不再活动。
这对我有用...
private final AtomicInteger atomicInteger = new AtomicInteger();
public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
receiver.receive()
.subscribeOn(Schedulers.single())
.doOnNext(record -> {
System.out.println(record.value() + "@" + record.offset());
if (failed.get() != null) {
System.out.println("Committing failed record offset " + record.value()
+ "@" + record.offset());
record.receiverOffset().acknowledge();
failed.set(null);
}
else {
atomicInteger.set(0);
try {
processRecord(record.value());
record.receiverOffset().acknowledge();
}
catch (Exception e) {
throw new ReceiverRecordException(record, e);
}
}
})
.doOnError(ex -> atomicInteger.incrementAndGet())
.retryWhen(retrySpec)
.onErrorResume(e -> {
ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
ReceiverRecord<?, ?> record = ex.getRecord();
System.out.println("Retries exhausted for " + record.value()
+ "@" + record.offset());
failed.set(record);
return Mono.empty();
})
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
throw new RuntimeException("Throwing exception!");
}
}
@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {
private final ReceiverRecord record;
ReceiverRecordException(ReceiverRecord record, Throwable t) {
super(t);
this.record = record;
}
public ReceiverRecord getRecord() {
return this.record;
}
}
编辑
这是完整的应用程序...
@SpringBootApplication
public class So67373188Application {
private static final Logger log = LoggerFactory.getLogger(So67373188Application.class);
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(So67373188Application.class, args);
Thread.sleep(120_000);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67373188").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner2() {
return args -> {
SenderOptions<String, String> so = SenderOptions.create(
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
KafkaSender<String, String> sender = KafkaSender.create(so);
Disposable subscribed = sender.send(Flux.just(pr("foo"), pr("bar"), pr("fail"), pr("baz")))
.subscribe(result -> {
System.out.println(result.recordMetadata());
});
Thread.sleep(5000);
subscribed.dispose();
};
}
@Bean
public ApplicationRunner runner3(KafkaOperations<String, String> template) {
return args -> {
DeadLetterPublishingRecoverer dlpr = new DeadLetterPublishingRecoverer(template);
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "so67373188",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.commitBatchSize(1)
.subscription(Collections.singletonList("so67373188"));
consumeRecords(ro);
};
}
private SenderRecord<String, String, String> pr(String value) {
return SenderRecord.create("so67373188", 0, null, null, value, value + ".corr");
}
private final AtomicInteger atomicInteger = new AtomicInteger();
public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
receiver.receive()
.subscribeOn(Schedulers.single())
.doOnNext(record -> {
System.out.println(record.value() + "@" + record.offset());
if (failed.get() != null) {
System.out.println("Committing failed record offset " + record.value()
+ "@" + record.offset());
record.receiverOffset().acknowledge();
failed.set(null);
}
else {
atomicInteger.set(0);
try {
processRecord(record.value());
record.receiverOffset().acknowledge();
}
catch (Exception e) {
throw new ReceiverRecordException(record, e);
}
}
})
.doOnError(ex -> atomicInteger.incrementAndGet())
.retryWhen(retrySpec)
.onErrorResume(e -> {
ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
ReceiverRecord<?, ?> record = ex.getRecord();
System.out.println("Retries exhausted for " + record.value()
+ "@" + record.offset());
failed.set(record);
return Mono.empty();
})
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
if (record.equals("fail")) {
throw new RuntimeException("Throwing exception!");
}
}
}
@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {
private final ReceiverRecord record;
ReceiverRecordException(ReceiverRecord record, Throwable t) {
super(t);
this.record = record;
}
public ReceiverRecord getRecord() {
return this.record;
}
}
结果:
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m [2m (v2.4.5)[0;39m
so67373188-0@16
so67373188-0@17
so67373188-0@18
so67373188-0@19
foo@16
bar@17
fail@18
fail@18
fail@18
fail@18
fail@18
fail@18
Retries exhausted for fail@18
fail@18
Committing failed record offset fail@18
baz@19