如何在 Spring Cloud Stream 中手动确认 RabbitMQ 消息?
How to manually acknowledge RabbitMQ messages in Spring Cloud Stream?
对于基于流的服务,当 @StreamListener
中调用的基础服务失败时,我希望消息保留在队列中。为此,我的理解是,唯一的方法是配置 spring.cloud.stream.bindings.channel_name.consumer.acknowledge-mode=MANUAL
.
进行此配置更改后,我尝试将 @Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag
作为方法参数添加到我现有的 @StreamListener
实现中,如 https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack 中所述。有了这段代码,我遇到了以下异常:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:941)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:851)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=11=]1(SimpleMessageListenerContainer.java:102)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:198)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:102)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'amqp_channel' for method parameter type [interface com.rabbitmq.client.Channel]
at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100)
at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)
然后我发现了以下内容:https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples,它显示了如何使用 Kafka 执行消息确认的示例,但我目前使用的是 RabbitMQ 绑定。我们计划最终迁移到 Kafka,但就目前而言,我如何配置和编写解决方案以对成功处理的消息进行手动消息确认和手动消息拒绝,从而在遇到异常时将消息留在队列中。我目前在 Spring Cloud Edgware.RELEASE
和 Spring Cloud Stream Ditmars.RELEASE
.
更新
现在我的配置如下:
spring:
cloud:
stream:
bindings:
do-something-async-reply:
group: xyz-service-do-something-async-reply
rabbit:
bindings:
do-something-async-reply:
consumer:
autoBindDlq: true
dlqDeadLetterExchange:
dlqTtl: 10000
requeueRejected: true
我在服务启动时收到以下错误:
2018-01-12 14:46:34.346 ERROR [xyz-service,,,] 2488 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'do-something-async-reply.xyz-service-do-something-async-reply' in vhost '/': received the value 'DLX' of type 'longstr' but current is none, class-id=50, method-id=10)
我缺少什么配置wrong/am?
属性名称不正确;你错过了 .rabbit
。这是
spring.cloud.stream.rabbit.bindings.<channel>
.consumer.acknowledge-mode=MANUAL
因为这是兔子特有的 属性 - 参见 the documentation。
编辑
示例:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So481977082Application {
public static void main(String[] args) {
SpringApplication.run(So481977082Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
System.out.println(in);
Thread.sleep(60_000);
channel.basicAck(tag, false);
System.out.println("Ackd");
}
}
请记住,手动确认的需求通常是一种气味;通常让容器处理 acks 会更好;参见 requeueRejected
在同一个 doco link。无条件重新排队会导致无限循环。
EDIT2
适合我...
@SpringBootApplication
@EnableBinding(Processor.class)
public class So48197708Application {
public static void main(String[] args) {
SpringApplication.run(So48197708Application.class, args);
}
@Bean
ApplicationRunner runner(MessageChannel output) {
return args -> {
output.send(new GenericMessage<>("foo"));
};
}
@StreamListener(Sink.INPUT)
public void listen(@Header(name = "x-death", required = false) List<?> death) {
System.out.println(death);
throw new RuntimeException("x");
}
}
和
spring:
cloud:
stream:
bindings:
input:
group: foo
content-type: application/json
destination: foo
consumer:
max-attempts: 1
output:
content-type: application/json
destination: foo
rabbit:
bindings:
input:
consumer:
auto-bind-dlq: true
dlqDeadLetterExchange:
dlqTtl: 10000
结果:
null
...
Caused by: java.lang.RuntimeException: x
...
[{reason=expired, count=1, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq},
{reason=rejected, count=1, exchange=foo, time=Fri Jan 12 17:20:18 EST 2018, routing-keys=[foo], queue=foo.foo}]
...
...
[{reason=expired, count=3, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq},
{reason=rejected, count=3, exchange=foo, routing-keys=[foo], time=Fri Jan 12 17:20:18 EST 2018, queue=foo.foo}]
对于基于流的服务,当 @StreamListener
中调用的基础服务失败时,我希望消息保留在队列中。为此,我的理解是,唯一的方法是配置 spring.cloud.stream.bindings.channel_name.consumer.acknowledge-mode=MANUAL
.
进行此配置更改后,我尝试将 @Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag
作为方法参数添加到我现有的 @StreamListener
实现中,如 https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack 中所述。有了这段代码,我遇到了以下异常:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:941)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:851)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=11=]1(SimpleMessageListenerContainer.java:102)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:198)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:102)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'amqp_channel' for method parameter type [interface com.rabbitmq.client.Channel]
at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100)
at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)
然后我发现了以下内容:https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples,它显示了如何使用 Kafka 执行消息确认的示例,但我目前使用的是 RabbitMQ 绑定。我们计划最终迁移到 Kafka,但就目前而言,我如何配置和编写解决方案以对成功处理的消息进行手动消息确认和手动消息拒绝,从而在遇到异常时将消息留在队列中。我目前在 Spring Cloud Edgware.RELEASE
和 Spring Cloud Stream Ditmars.RELEASE
.
更新
现在我的配置如下:
spring:
cloud:
stream:
bindings:
do-something-async-reply:
group: xyz-service-do-something-async-reply
rabbit:
bindings:
do-something-async-reply:
consumer:
autoBindDlq: true
dlqDeadLetterExchange:
dlqTtl: 10000
requeueRejected: true
我在服务启动时收到以下错误:
2018-01-12 14:46:34.346 ERROR [xyz-service,,,] 2488 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'do-something-async-reply.xyz-service-do-something-async-reply' in vhost '/': received the value 'DLX' of type 'longstr' but current is none, class-id=50, method-id=10)
我缺少什么配置wrong/am?
属性名称不正确;你错过了 .rabbit
。这是
spring.cloud.stream.rabbit.bindings.
<channel>
.consumer.acknowledge-mode=MANUAL
因为这是兔子特有的 属性 - 参见 the documentation。
编辑
示例:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So481977082Application {
public static void main(String[] args) {
SpringApplication.run(So481977082Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
System.out.println(in);
Thread.sleep(60_000);
channel.basicAck(tag, false);
System.out.println("Ackd");
}
}
请记住,手动确认的需求通常是一种气味;通常让容器处理 acks 会更好;参见 requeueRejected
在同一个 doco link。无条件重新排队会导致无限循环。
EDIT2
适合我...
@SpringBootApplication
@EnableBinding(Processor.class)
public class So48197708Application {
public static void main(String[] args) {
SpringApplication.run(So48197708Application.class, args);
}
@Bean
ApplicationRunner runner(MessageChannel output) {
return args -> {
output.send(new GenericMessage<>("foo"));
};
}
@StreamListener(Sink.INPUT)
public void listen(@Header(name = "x-death", required = false) List<?> death) {
System.out.println(death);
throw new RuntimeException("x");
}
}
和
spring:
cloud:
stream:
bindings:
input:
group: foo
content-type: application/json
destination: foo
consumer:
max-attempts: 1
output:
content-type: application/json
destination: foo
rabbit:
bindings:
input:
consumer:
auto-bind-dlq: true
dlqDeadLetterExchange:
dlqTtl: 10000
结果:
null
...
Caused by: java.lang.RuntimeException: x
...
[{reason=expired, count=1, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq},
{reason=rejected, count=1, exchange=foo, time=Fri Jan 12 17:20:18 EST 2018, routing-keys=[foo], queue=foo.foo}]
...
...
[{reason=expired, count=3, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq},
{reason=rejected, count=3, exchange=foo, routing-keys=[foo], time=Fri Jan 12 17:20:18 EST 2018, queue=foo.foo}]