如何创建带有事务的处理器和带有 Rabbit 绑定的 DLQ?
How to create Processor with Transaction and DLQ with Rabbit binding?
我刚刚开始学习 Spring Cloud Streams 和 Dataflow,我想知道其中一个对我来说很重要的用例。我创建了示例处理器 Multiplier,它接收消息并将其重新发送 5 次以输出。
@EnableBinding(Processor.class)
public class MultiplierProcessor {
@Autowired
private Source source;
private int repeats = 5;
@Transactional
@StreamListener(Processor.INPUT)
public void handle(String payload) {
for (int i = 0; i < repeats; i++) {
if(i == 4) {
throw new RuntimeException("EXCEPTION");
}
source.output().send(new GenericMessage<>(payload));
}
}
}
您可以看到,在第 5 次发送之前,此处理器崩溃了。为什么?因为它可以(程序抛出异常)。在这种情况下,我想在 Spring Cloud Stream 上练习故障预防。
我想要实现的是让输入消息在 DLQ 中得到支持,并且之前发送的 4 条消息被还原并且不被下一个操作数消耗(就像在正常的 JMS 事务中一样)。我已经尝试在我的处理器项目中定义以下属性但没有成功。
spring.cloud.stream.bindings.output.producer.autoBindDlq=true
spring.cloud.stream.bindings.output.producer.republishToDlq=true
spring.cloud.stream.bindings.output.producer.transacted=true
spring.cloud.stream.bindings.input.consumer.autoBindDlq=true
你能告诉我是否可行以及我做错了什么吗?我会非常感谢一些例子。
您的配置有几个问题:
- 兔子特定属性中缺少
.rabbit
)
- 您需要组名和持久订阅才能使用
autoBindDlq
autoBindDlq
不适用于输出端
必须对消费者进行交易,以便生产者发送在同一笔交易中执行。
我刚刚用 1.0 测试了这个。2.RELEASE:
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true
spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400
spring.cloud.stream.rabbit.bindings.input.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.input.consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
它按预期工作。
编辑
实际上,不,发布的消息没有被回滚。正在调查...
EDIT2
好的;它确实有效,但你不能使用 republishToDlq
- 因为当它被启用时,活页夹将失败的消息发布到 DLQ 并且事务被提交。
当为 false 时,向容器抛出异常,回滚事务,RabbitMQ 将失败的消息移至 DLQ。
但是请注意,默认情况下启用重试(3 次尝试),因此,如果您的处理器在重试期间成功,您将在输出中得到重复项。
要使其按您的意愿工作,您需要通过将最大尝试次数设置为 1 来禁用重试(并且不要使用 republishToDlq
)。
EDIT3
好的,如果你想更多地控制错误的发布,当 this JIRA 的修复应用于 Spring AMQP 时,这将起作用...
@SpringBootApplication
@EnableBinding({ Processor.class, So39018400Application.Errors.class })
public class So39018400Application {
public static void main(String[] args) {
SpringApplication.run(So39018400Application.class, args);
}
@Bean
public Foo foo() {
return new Foo();
}
public interface Errors {
@Output("errors")
MessageChannel errorChannel();
}
private static class Foo {
@Autowired
Source source;
@Autowired
Errors errors;
@StreamListener(Processor.INPUT)
public void handle (Message<byte[]> in) {
try {
source.output().send(new GenericMessage<>("foo"));
source.output().send(new GenericMessage<>("foo"));
throw new RuntimeException("foo");
}
catch (RuntimeException e) {
errors.errorChannel().send(MessageBuilder.fromMessage(in)
.setHeader("foo", "bar") // add whatever you want, stack trace etc.
.build());
throw e;
}
}
}
}
具有属性:
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.errors.destination=so8400errors
spring.cloud.stream.rabbit.bindings.errors.producer.transacted=false
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true
spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=false
spring.cloud.stream.bindings.input.consumer.max-attempts=1
我刚刚开始学习 Spring Cloud Streams 和 Dataflow,我想知道其中一个对我来说很重要的用例。我创建了示例处理器 Multiplier,它接收消息并将其重新发送 5 次以输出。
@EnableBinding(Processor.class)
public class MultiplierProcessor {
@Autowired
private Source source;
private int repeats = 5;
@Transactional
@StreamListener(Processor.INPUT)
public void handle(String payload) {
for (int i = 0; i < repeats; i++) {
if(i == 4) {
throw new RuntimeException("EXCEPTION");
}
source.output().send(new GenericMessage<>(payload));
}
}
}
您可以看到,在第 5 次发送之前,此处理器崩溃了。为什么?因为它可以(程序抛出异常)。在这种情况下,我想在 Spring Cloud Stream 上练习故障预防。
我想要实现的是让输入消息在 DLQ 中得到支持,并且之前发送的 4 条消息被还原并且不被下一个操作数消耗(就像在正常的 JMS 事务中一样)。我已经尝试在我的处理器项目中定义以下属性但没有成功。
spring.cloud.stream.bindings.output.producer.autoBindDlq=true
spring.cloud.stream.bindings.output.producer.republishToDlq=true
spring.cloud.stream.bindings.output.producer.transacted=true
spring.cloud.stream.bindings.input.consumer.autoBindDlq=true
你能告诉我是否可行以及我做错了什么吗?我会非常感谢一些例子。
您的配置有几个问题:
- 兔子特定属性中缺少
.rabbit
) - 您需要组名和持久订阅才能使用
autoBindDlq
autoBindDlq
不适用于输出端
必须对消费者进行交易,以便生产者发送在同一笔交易中执行。
我刚刚用 1.0 测试了这个。2.RELEASE:
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true
spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400
spring.cloud.stream.rabbit.bindings.input.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.input.consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
它按预期工作。
编辑
实际上,不,发布的消息没有被回滚。正在调查...
EDIT2
好的;它确实有效,但你不能使用 republishToDlq
- 因为当它被启用时,活页夹将失败的消息发布到 DLQ 并且事务被提交。
当为 false 时,向容器抛出异常,回滚事务,RabbitMQ 将失败的消息移至 DLQ。
但是请注意,默认情况下启用重试(3 次尝试),因此,如果您的处理器在重试期间成功,您将在输出中得到重复项。
要使其按您的意愿工作,您需要通过将最大尝试次数设置为 1 来禁用重试(并且不要使用 republishToDlq
)。
EDIT3
好的,如果你想更多地控制错误的发布,当 this JIRA 的修复应用于 Spring AMQP 时,这将起作用...
@SpringBootApplication
@EnableBinding({ Processor.class, So39018400Application.Errors.class })
public class So39018400Application {
public static void main(String[] args) {
SpringApplication.run(So39018400Application.class, args);
}
@Bean
public Foo foo() {
return new Foo();
}
public interface Errors {
@Output("errors")
MessageChannel errorChannel();
}
private static class Foo {
@Autowired
Source source;
@Autowired
Errors errors;
@StreamListener(Processor.INPUT)
public void handle (Message<byte[]> in) {
try {
source.output().send(new GenericMessage<>("foo"));
source.output().send(new GenericMessage<>("foo"));
throw new RuntimeException("foo");
}
catch (RuntimeException e) {
errors.errorChannel().send(MessageBuilder.fromMessage(in)
.setHeader("foo", "bar") // add whatever you want, stack trace etc.
.build());
throw e;
}
}
}
}
具有属性:
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.errors.destination=so8400errors
spring.cloud.stream.rabbit.bindings.errors.producer.transacted=false
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true
spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=false
spring.cloud.stream.bindings.input.consumer.max-attempts=1