使用 Spring Cloud Stream 生成多个主题时的 DLQ、有界重试和 EOS
DLQ, bounded retry, and EOS when producing to multiple topics using Spring Cloud Stream
我正在尝试编写一个转换函数,它将使用来自一个主题的输入并为主题 Left 和 Right 产生两个输出。此外,我需要在事务中进行此操作,以便如果应用程序未能向 Right 生成消息,它将回滚到 Left 的尝试。此外,Kafka 应该在发生故障时重新尝试交付给转换的消费者,以便应用程序有多次机会从瞬态错误中恢复。但是,如果错误是长期错误,我需要应用程序在尝试一些次数(比如说 3 次)后放弃,此时应该将消息传送到 DLQ。
我了解如何使用命令式模型实现有界重试、DLQ 和事务。也就是说,我可以使用 Function<IN, OUT>
并获得 90% 的解决方案。但是,据我所知,目前 Function<IN, Tuple2<OUT1, OUT2>>
不是受支持的签名,我必须使用反应式编程模型来传递到多个主题。此外,据我所知,自动 DLQ 不是响应式消费者框架的一部分。它看起来也不像以相同的方式管理事务,并且消费者偏移量提交似乎是自动的,而不是以流处理的成功或失败为条件。
有人可以建议我如何编写消费者:
- 将一个输入转换为两个输出,
- 以原子方式将这些输出发布到两个主题(如果出现问题则回滚),
- 在出现任何失败的情况下最多重新尝试摄取、转换和发布消息 3 次,
- 如果所有重试都已用完,将消息排入 DLQ 以便应用程序不被阻止?
这不需要回答我的问题。
以下是我到目前为止尝试过的,改编自我最初满足其他要求的单输出命令式函数。它旨在一次从输入主题中消耗一个整数,在事务中创建一个 audit_log 数据库记录(如果失败,回滚所有内容并重试),将数字转换为两个 json标记为“左”或“右”的有效负载,然后将 json 发布到签名的输出 0 和 1。我试图将输出 3 用于 DLQ,因为来自反应流的错误不会像命令式模型那样自动进行 DLQ。这是探索性代码,只是为了学习诀窍,因此有随机机会抛出 RTE 以演练失败场景。
如果出现异常,则当前消息丢失,无需重试。如果应用程序发布了两条消息之一,而后者被异常中断,则前者仍会提交(似乎没有发生事务回滚)。最后,因为所有的错误都被删除了,所以它们永远不会到达我的代码以尝试将它们传递给 DLQ。
@Bean
@Transactional
public Function<Flux<Message<Integer>>, Tuple3<Flux<String>, Flux<String>, Flux<Object>>>
numberToJson(AuditLogRepository repository) {
var random = new Random();
// I tried to create two sinks, one for each output topic. This works fine.
var left = Sinks.many().unicast().<Integer>onBackpressureBuffer();
var right = Sinks.many().unicast().<Integer>onBackpressureBuffer();
// I tried to use this sink for the DLQ destination. I was hoping to manually shuffle UE's to the DLQ.
var dlq = Sinks.many().unicast().onBackpressureBuffer();
return flux -> {
// Do a database operation (in a transaction).
var persistent =
flux.map(
message -> {
var n = message.getPayload();
repository.createIfNotExists("Transformed n=" + n);
if (random.nextDouble() < 0.3) {
LOGGER.error("Transformer failure on n=" + n);
throw new RuntimeException("Transformer failure on n=" + n);
}
return message;
})
.doOnNext(
message -> {
var n = message.getPayload();
left.tryEmitNext(n).orThrow();
// If only one side fails to publish, we want the txn to roll back.
if (random.nextDouble() < 0.1) {
LOGGER.error("Failed to publish right-side JSON: n=" + n);
throw new RuntimeException("Failed to publish right-side JSON: n=" + n);
}
right.tryEmitNext(n).orThrow();
})
.retry(3) // Make 3 attempts overall to process and publish. If that fails, continue
// to the DLQ.
.onErrorContinue(
(error, message) -> {
dlq.tryEmitNext(message).orThrowWithCause(error);
})
.retry() // If DLQ fails and this flux crashes, always restart it. The failed message
// will be redelivered.
.publish()
.autoConnect(3);
// Split the "persistent" flux into three, which map to separate kafka topics and DLQ.
return Tuples.of(
left.asFlux()
.doOnSubscribe(_sub -> persistent.subscribe())
.map(n -> toJson(n, "left"))
.retry(),
right
.asFlux()
.doOnSubscribe(_sub -> persistent.subscribe())
.map(n -> toJson(n, "right"))
.retry(),
dlq.asFlux().doOnSubscribe(_sub -> persistent.subscribe()).retry());
};
}
最后,这是我的 application.yml
的相关部分。我有一些命令式尝试遗留下来的配置,例如 Kafka DLQ 设置。我省略了流中较早或较晚的发布者和消费者,因为我知道它们工作正常。
spring:
cloud:
stream:
kafka:
binder:
transaction:
transaction-id-prefix: 'tx-'
producer:
configuration:
retries: 3
acks: all
bindings:
numberToJson-in-0:
consumer:
enableDlq: true
numberToJson-out-0:
producer:
topic.properties:
retention.bytes: 10000
bindings:
numberToJson-in-0:
destination: tx-number
group: numberToJson
consumer:
# Not sure how this interacts with the flux retries, if at all.
maxAttempts: 2
properties:
isolation.level: read_committed
numberToJson-out-0:
destination: tx-json-left
producer:
partitionCount: 3
numberToJson-out-1:
destination: tx-json-right
numberToJson-out-2:
# Manually wiring the function's 3rd output to a DLQ.
destination: error.tx-number.numberToJson
function:
definition: numberToJson
[编辑]
这是我曾尝试配备 Tuple2 签名的命令式消费者,否则它可以工作:
@Bean
@Transactional
public Function<Integer, Tuple2<String, String>> numberToJson(
AuditLogRepository repository) {
var random = new Random();
return n -> {
LOGGER.info("Transforming n=" + n);
var left = "{ \"n\": \"" + n + "\", \"side\": \"left\" }";
var right = "{ \"n\": \"" + n + "\", \"side\": \"right\" }";
repository.createIfNotExists("Transformed n=" + n);
if (random.nextDouble() < 0.3) {
LOGGER.error("Transformer failure on n=" + n);
throw new RuntimeException("Transformer failure on n=" + n);
}
return Tuples.of(left, right);
};
}
但是,这在运行时会遇到以下异常:
Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.UnsupportedOperationException: At the moment only Tuple-based function are supporting multiple arguments
谢谢!
使用 StreamBridge
并添加已配置的 AfterRollbackProcessor
。
以下示例包括 left
、right
和 input.DLT
上的消费者。
@SpringBootApplication(proxyBeanMethods = false)
public class So68928091Application {
@Autowired
StreamBridge bridge;
public static void main(String[] args) {
SpringApplication.run(So68928091Application.class, args);
}
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("input.DLT").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicLeft() {
return TopicBuilder.name("left").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicRight() {
return TopicBuilder.name("right").partitions(1).replicas(1).build();
}
@KafkaListener(id = "left", topics = "left")
public void listenLeft(String in) {
System.out.println("left:" + in);
}
@KafkaListener(id = "right", topics = "right")
public void listenRight(String in) {
System.out.println("right:" + in);
}
@KafkaListener(id = "dlt", topics = "input.DLT")
public void listenDlt(String in) {
System.out.println("dlt:" + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
}
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
#For @KafkaListeners
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed
Fail
...
Fail
...
Fail
...
Good
dlt:Fail
right:good
left:GOOD
删除绑定中的所有 DLT 设置,并将 maxAttempts
设置为 1 以禁用那里的重试。
我正在尝试编写一个转换函数,它将使用来自一个主题的输入并为主题 Left 和 Right 产生两个输出。此外,我需要在事务中进行此操作,以便如果应用程序未能向 Right 生成消息,它将回滚到 Left 的尝试。此外,Kafka 应该在发生故障时重新尝试交付给转换的消费者,以便应用程序有多次机会从瞬态错误中恢复。但是,如果错误是长期错误,我需要应用程序在尝试一些次数(比如说 3 次)后放弃,此时应该将消息传送到 DLQ。
我了解如何使用命令式模型实现有界重试、DLQ 和事务。也就是说,我可以使用 Function<IN, OUT>
并获得 90% 的解决方案。但是,据我所知,目前 Function<IN, Tuple2<OUT1, OUT2>>
不是受支持的签名,我必须使用反应式编程模型来传递到多个主题。此外,据我所知,自动 DLQ 不是响应式消费者框架的一部分。它看起来也不像以相同的方式管理事务,并且消费者偏移量提交似乎是自动的,而不是以流处理的成功或失败为条件。
有人可以建议我如何编写消费者:
- 将一个输入转换为两个输出,
- 以原子方式将这些输出发布到两个主题(如果出现问题则回滚),
- 在出现任何失败的情况下最多重新尝试摄取、转换和发布消息 3 次,
- 如果所有重试都已用完,将消息排入 DLQ 以便应用程序不被阻止?
这不需要回答我的问题。
以下是我到目前为止尝试过的,改编自我最初满足其他要求的单输出命令式函数。它旨在一次从输入主题中消耗一个整数,在事务中创建一个 audit_log 数据库记录(如果失败,回滚所有内容并重试),将数字转换为两个 json标记为“左”或“右”的有效负载,然后将 json 发布到签名的输出 0 和 1。我试图将输出 3 用于 DLQ,因为来自反应流的错误不会像命令式模型那样自动进行 DLQ。这是探索性代码,只是为了学习诀窍,因此有随机机会抛出 RTE 以演练失败场景。
如果出现异常,则当前消息丢失,无需重试。如果应用程序发布了两条消息之一,而后者被异常中断,则前者仍会提交(似乎没有发生事务回滚)。最后,因为所有的错误都被删除了,所以它们永远不会到达我的代码以尝试将它们传递给 DLQ。
@Bean
@Transactional
public Function<Flux<Message<Integer>>, Tuple3<Flux<String>, Flux<String>, Flux<Object>>>
numberToJson(AuditLogRepository repository) {
var random = new Random();
// I tried to create two sinks, one for each output topic. This works fine.
var left = Sinks.many().unicast().<Integer>onBackpressureBuffer();
var right = Sinks.many().unicast().<Integer>onBackpressureBuffer();
// I tried to use this sink for the DLQ destination. I was hoping to manually shuffle UE's to the DLQ.
var dlq = Sinks.many().unicast().onBackpressureBuffer();
return flux -> {
// Do a database operation (in a transaction).
var persistent =
flux.map(
message -> {
var n = message.getPayload();
repository.createIfNotExists("Transformed n=" + n);
if (random.nextDouble() < 0.3) {
LOGGER.error("Transformer failure on n=" + n);
throw new RuntimeException("Transformer failure on n=" + n);
}
return message;
})
.doOnNext(
message -> {
var n = message.getPayload();
left.tryEmitNext(n).orThrow();
// If only one side fails to publish, we want the txn to roll back.
if (random.nextDouble() < 0.1) {
LOGGER.error("Failed to publish right-side JSON: n=" + n);
throw new RuntimeException("Failed to publish right-side JSON: n=" + n);
}
right.tryEmitNext(n).orThrow();
})
.retry(3) // Make 3 attempts overall to process and publish. If that fails, continue
// to the DLQ.
.onErrorContinue(
(error, message) -> {
dlq.tryEmitNext(message).orThrowWithCause(error);
})
.retry() // If DLQ fails and this flux crashes, always restart it. The failed message
// will be redelivered.
.publish()
.autoConnect(3);
// Split the "persistent" flux into three, which map to separate kafka topics and DLQ.
return Tuples.of(
left.asFlux()
.doOnSubscribe(_sub -> persistent.subscribe())
.map(n -> toJson(n, "left"))
.retry(),
right
.asFlux()
.doOnSubscribe(_sub -> persistent.subscribe())
.map(n -> toJson(n, "right"))
.retry(),
dlq.asFlux().doOnSubscribe(_sub -> persistent.subscribe()).retry());
};
}
最后,这是我的 application.yml
的相关部分。我有一些命令式尝试遗留下来的配置,例如 Kafka DLQ 设置。我省略了流中较早或较晚的发布者和消费者,因为我知道它们工作正常。
spring:
cloud:
stream:
kafka:
binder:
transaction:
transaction-id-prefix: 'tx-'
producer:
configuration:
retries: 3
acks: all
bindings:
numberToJson-in-0:
consumer:
enableDlq: true
numberToJson-out-0:
producer:
topic.properties:
retention.bytes: 10000
bindings:
numberToJson-in-0:
destination: tx-number
group: numberToJson
consumer:
# Not sure how this interacts with the flux retries, if at all.
maxAttempts: 2
properties:
isolation.level: read_committed
numberToJson-out-0:
destination: tx-json-left
producer:
partitionCount: 3
numberToJson-out-1:
destination: tx-json-right
numberToJson-out-2:
# Manually wiring the function's 3rd output to a DLQ.
destination: error.tx-number.numberToJson
function:
definition: numberToJson
[编辑] 这是我曾尝试配备 Tuple2 签名的命令式消费者,否则它可以工作:
@Bean
@Transactional
public Function<Integer, Tuple2<String, String>> numberToJson(
AuditLogRepository repository) {
var random = new Random();
return n -> {
LOGGER.info("Transforming n=" + n);
var left = "{ \"n\": \"" + n + "\", \"side\": \"left\" }";
var right = "{ \"n\": \"" + n + "\", \"side\": \"right\" }";
repository.createIfNotExists("Transformed n=" + n);
if (random.nextDouble() < 0.3) {
LOGGER.error("Transformer failure on n=" + n);
throw new RuntimeException("Transformer failure on n=" + n);
}
return Tuples.of(left, right);
};
}
但是,这在运行时会遇到以下异常:
Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.UnsupportedOperationException: At the moment only Tuple-based function are supporting multiple arguments
谢谢!
使用 StreamBridge
并添加已配置的 AfterRollbackProcessor
。
以下示例包括 left
、right
和 input.DLT
上的消费者。
@SpringBootApplication(proxyBeanMethods = false)
public class So68928091Application {
@Autowired
StreamBridge bridge;
public static void main(String[] args) {
SpringApplication.run(So68928091Application.class, args);
}
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("input.DLT").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicLeft() {
return TopicBuilder.name("left").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicRight() {
return TopicBuilder.name("right").partitions(1).replicas(1).build();
}
@KafkaListener(id = "left", topics = "left")
public void listenLeft(String in) {
System.out.println("left:" + in);
}
@KafkaListener(id = "right", topics = "right")
public void listenRight(String in) {
System.out.println("right:" + in);
}
@KafkaListener(id = "dlt", topics = "input.DLT")
public void listenDlt(String in) {
System.out.println("dlt:" + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
}
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
#For @KafkaListeners
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed
Fail
...
Fail
...
Fail
...
Good
dlt:Fail
right:good
left:GOOD
删除绑定中的所有 DLT 设置,并将 maxAttempts
设置为 1 以禁用那里的重试。