spring amqp 死信队列不适用于事务
spring amqp dead letter queue does not work with transaction
当 channelTransacted(true) 发生错误而不是重新排队时,我如何设置监听器容器将消息扔进死信队列?当我不使用 channelTransacted 时,一切正常,我可以在死信队列中看到消息。
@Bean(name = "amqpInboundEsignRequest")
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {
return IntegrationFlows.from(
Amqp.inboundAdapter(connectionFactory, esignIAutoRequestQueue())
.acknowledgeMode(AcknowledgeMode.AUTO)
.messageConverter(new Jackson2JsonMessageConverter())
.autoStartup(false)
.defaultRequeueRejected(false)
//.channelTransacted(true) // dead letter does not work
//.transactionManager(transactionManager) // dead letter does not work
)
.log("amqpInbound.start-process")
.handle(p -> {
throw new RuntimeException("Something wrong!");
})
.get();
}
编辑
这些是依赖项的版本。
[INFO] +- org.springframework.boot:spring-boot-starter-amqp:jar:1.5.9.RELEASE:compile
[INFO] | +- org.springframework:spring-messaging:jar:4.3.13.RELEASE:compile
[INFO] | \- org.springframework.amqp:spring-rabbit:jar:1.7.4.RELEASE:compile
[INFO] | +- com.rabbitmq:http-client:jar:1.1.1.RELEASE:compile
[INFO] | +- com.rabbitmq:amqp-client:jar:4.0.3:compile
[INFO] | +- org.springframework.retry:spring-retry:jar:1.2.1.RELEASE:compile
[INFO] | \- org.springframework.amqp:spring-amqp:jar:1.7.4.RELEASE:compile
[INFO] +- org.springframework.boot:spring-boot-starter-integration:jar:1.5.9.RELEASE:compile
[INFO] | +- org.springframework.integration:spring-integration-core:jar:4.3.12.RELEASE:compile
[INFO] | \- org.springframework.integration:spring-integration-java-dsl:jar:1.2.3.RELEASE:compile
[INFO] | \- org.reactivestreams:reactive-streams:jar:1.0.0:compile
我想让交易与外部交易数据库 (PlatformTransactionManager) 同步。当我在侦听器容器上设置 transactionManager(transactionManager) 时,它总是重新排队。
您使用的是什么版本?我刚刚用 Spring Integration 5.0.2 和 Spring AMQP 2.0.2,以及 Spring Integration 4.3.14 和 Spring AMQP 1.7 测试了 channelTransacted=true
。 6 一切都按预期工作,失败的消息转到 DLQ。
@SpringBootApplication
public class So48914749Application {
public static void main(String[] args) {
SpringApplication.run(So48914749Application.class, args);
}
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(
Amqp.inboundAdapter(connectionFactory, "foo")
.acknowledgeMode(AcknowledgeMode.AUTO)
.messageConverter(new Jackson2JsonMessageConverter())
.autoStartup(true)
.defaultRequeueRejected(false)
.channelTransacted(true) // dead letter does not work
// .transactionManager(transactionManager) // dead letter does not work
)
.log("amqpInbound.start-process")
.handle(p -> {
throw new RuntimeException("Something wrong!");
})
.get();
}
@Bean
public Queue queue() {
return QueueBuilder.durable("foo")
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", "dlq")
.build();
}
@Bean
public Queue dlq() {
return new Queue("dlq");
}
@RabbitListener(queues = "dlq")
public void dlq(Message in) {
System.out.println(in);
}
}
结果:
(Body:'[B@17793549(byte[8])' MessageProperties [headers={x-first-death-exchange=, x-death=[{reason=rejected, count=1, exchange=, time=Wed Feb 21 16:43:48 EST 2018, routing-keys=[foo], queue=foo}], x-first-death-reason=rejected, x-first-death-queue=foo}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=dlq, receivedDelay=null, deliveryTag=4, messageCount=0, consumerTag=amq.ctag-dOypXDkkQ5Hvw2W9cUIpzg, consumerQueue=dlq])
编辑
以后请更准确;您的问题暗示无论是否提供交易管理器,都会发生同样的问题。
参见 the second NOTE under A note on Rollback of Received Messages。这在 2.0 中已修复。
在 1.7.x 中,为了向后兼容,我们必须默认保留旧行为。 1.7.x documentation 说明您必须将 alwaysRequeueWithTxManagerRollback
设置为 false
以获得新的(一致的)行为。
这是一个侦听器容器属性,因此您需要连接一个容器并将其注入入站通道适配器。
当 channelTransacted(true) 发生错误而不是重新排队时,我如何设置监听器容器将消息扔进死信队列?当我不使用 channelTransacted 时,一切正常,我可以在死信队列中看到消息。
@Bean(name = "amqpInboundEsignRequest")
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {
return IntegrationFlows.from(
Amqp.inboundAdapter(connectionFactory, esignIAutoRequestQueue())
.acknowledgeMode(AcknowledgeMode.AUTO)
.messageConverter(new Jackson2JsonMessageConverter())
.autoStartup(false)
.defaultRequeueRejected(false)
//.channelTransacted(true) // dead letter does not work
//.transactionManager(transactionManager) // dead letter does not work
)
.log("amqpInbound.start-process")
.handle(p -> {
throw new RuntimeException("Something wrong!");
})
.get();
}
编辑
这些是依赖项的版本。
[INFO] +- org.springframework.boot:spring-boot-starter-amqp:jar:1.5.9.RELEASE:compile
[INFO] | +- org.springframework:spring-messaging:jar:4.3.13.RELEASE:compile
[INFO] | \- org.springframework.amqp:spring-rabbit:jar:1.7.4.RELEASE:compile
[INFO] | +- com.rabbitmq:http-client:jar:1.1.1.RELEASE:compile
[INFO] | +- com.rabbitmq:amqp-client:jar:4.0.3:compile
[INFO] | +- org.springframework.retry:spring-retry:jar:1.2.1.RELEASE:compile
[INFO] | \- org.springframework.amqp:spring-amqp:jar:1.7.4.RELEASE:compile
[INFO] +- org.springframework.boot:spring-boot-starter-integration:jar:1.5.9.RELEASE:compile
[INFO] | +- org.springframework.integration:spring-integration-core:jar:4.3.12.RELEASE:compile
[INFO] | \- org.springframework.integration:spring-integration-java-dsl:jar:1.2.3.RELEASE:compile
[INFO] | \- org.reactivestreams:reactive-streams:jar:1.0.0:compile
我想让交易与外部交易数据库 (PlatformTransactionManager) 同步。当我在侦听器容器上设置 transactionManager(transactionManager) 时,它总是重新排队。
您使用的是什么版本?我刚刚用 Spring Integration 5.0.2 和 Spring AMQP 2.0.2,以及 Spring Integration 4.3.14 和 Spring AMQP 1.7 测试了 channelTransacted=true
。 6 一切都按预期工作,失败的消息转到 DLQ。
@SpringBootApplication
public class So48914749Application {
public static void main(String[] args) {
SpringApplication.run(So48914749Application.class, args);
}
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(
Amqp.inboundAdapter(connectionFactory, "foo")
.acknowledgeMode(AcknowledgeMode.AUTO)
.messageConverter(new Jackson2JsonMessageConverter())
.autoStartup(true)
.defaultRequeueRejected(false)
.channelTransacted(true) // dead letter does not work
// .transactionManager(transactionManager) // dead letter does not work
)
.log("amqpInbound.start-process")
.handle(p -> {
throw new RuntimeException("Something wrong!");
})
.get();
}
@Bean
public Queue queue() {
return QueueBuilder.durable("foo")
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", "dlq")
.build();
}
@Bean
public Queue dlq() {
return new Queue("dlq");
}
@RabbitListener(queues = "dlq")
public void dlq(Message in) {
System.out.println(in);
}
}
结果:
(Body:'[B@17793549(byte[8])' MessageProperties [headers={x-first-death-exchange=, x-death=[{reason=rejected, count=1, exchange=, time=Wed Feb 21 16:43:48 EST 2018, routing-keys=[foo], queue=foo}], x-first-death-reason=rejected, x-first-death-queue=foo}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=dlq, receivedDelay=null, deliveryTag=4, messageCount=0, consumerTag=amq.ctag-dOypXDkkQ5Hvw2W9cUIpzg, consumerQueue=dlq])
编辑
以后请更准确;您的问题暗示无论是否提供交易管理器,都会发生同样的问题。
参见 the second NOTE under A note on Rollback of Received Messages。这在 2.0 中已修复。
在 1.7.x 中,为了向后兼容,我们必须默认保留旧行为。 1.7.x documentation 说明您必须将 alwaysRequeueWithTxManagerRollback
设置为 false
以获得新的(一致的)行为。
这是一个侦听器容器属性,因此您需要连接一个容器并将其注入入站通道适配器。