Spring 通过 errorChannel 与 JmsTransactionManager 抛出的集成重新交付不支持 maximumRedeliveries
Spring Integration redelivery via errorChannel throw with JmsTransactionManager doesnt honor maximumRedeliveries
与 SO 问题相关:
在 maximumRedeliveries 设置为 3 的 connectionFactory 上使用事务轮询器和 JmsTransactionManager 会导致实际的 redlievery 尝试加倍。
我怎样才能得到它来遵守连接工厂的重新传递设置?
我的 connectionFactory 构建为:
@Bean (name="spring-int-connection-factory")
ActiveMQConnectionFactory jmsConnectionFactory(){
return buildConnectionFactory(
brokerUrl,
DELAY_2_SECS,
MAX_REDELIVERIES,
"spring-int");
}
public static ActiveMQConnectionFactory buildConnectionFactory(String brokerUrl, Long retryDelay, Integer maxRedeliveries, String clientIdPrefix){
ActiveMQConnectionFactory amqcf = new ActiveMQConnectionFactory();
amqcf.setBrokerURL(brokerUrl);
amqcf.setClientIDPrefix(clientIdPrefix);
if (maxRedeliveries != null) {
if (retryDelay == null) {
retryDelay = 500L;
}
RedeliveryPolicy rp = new org.apache.activemq.RedeliveryPolicy();
rp.setInitialRedeliveryDelay(retryDelay);
rp.setRedeliveryDelay(retryDelay);
rp.setMaximumRedeliveries(maxRedeliveries);
}
return amqcf;
}
我的轮询流程如下:
@Bean
public IntegrationFlow flow2(@Qualifier("spring-int-connection-factory") ConnectionFactory connectionFactory) {
IntegrationFlow flow = IntegrationFlows.from(
Jms.inboundAdapter(connectionFactory)
.configureJmsTemplate(t -> t.receiveTimeout(1000).sessionTransacted(true))
.destination(INPUT_DIRECT_QUEUE),
e -> e.poller(Pollers
.fixedDelay(5000)
.transactional()
.errorChannel("customErrorChannel")
.maxMessagesPerPoll(2))
).handle(this.msgHandler).get();
return flow;
}
我的 errorChannel 处理程序只是重新抛出,这导致 JMS 重新传递发生。
当我 运行 将处理程序设置为始终抛出异常时,我看到消息处理程序实际收到消息 7 次(1 次初始和 6 次重新传递)。
根据我的 connectionFactory 配置,我预计只有 3 次重新传送。
知道是什么原因导致尝试次数加倍以及如何缓解吗?
这对我来说很好用 - 停在 4...
@SpringBootApplication
public class So51792909Application {
private static final Logger logger = LoggerFactory.getLogger(So51792909Application.class);
public static void main(String[] args) {
SpringApplication.run(So51792909Application.class, args);
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
for (int i = 0; i < 1; i++) {
template.convertAndSend("foo", "test");
}
};
}
@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory)
.destination("foo"), e -> e
.poller(Pollers
.fixedDelay(5000)
.transactional()
.maxMessagesPerPoll(2)))
.handle((p, h) -> {
System.out.println(h.get("JMSXDeliveryCount"));
try {
Thread.sleep(2000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
throw new RuntimeException("foo");
})
.get();
}
@Bean
public JmsTransactionManager transactionManager(ConnectionFactory cf) {
return new JmsTransactionManager(cf);
}
@Bean
public ActiveMQConnectionFactory amqCF() {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
RedeliveryPolicy rp = new RedeliveryPolicy();
rp.setMaximumRedeliveries(3);
cf.setRedeliveryPolicy(rp);
return cf;
}
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory(amqCF());
}
@JmsListener(destination = "ActiveMQ.DLQ")
public void listen(String in) {
logger.info(in);
}
}
与 SO 问题相关:
在 maximumRedeliveries 设置为 3 的 connectionFactory 上使用事务轮询器和 JmsTransactionManager 会导致实际的 redlievery 尝试加倍。
我怎样才能得到它来遵守连接工厂的重新传递设置?
我的 connectionFactory 构建为:
@Bean (name="spring-int-connection-factory")
ActiveMQConnectionFactory jmsConnectionFactory(){
return buildConnectionFactory(
brokerUrl,
DELAY_2_SECS,
MAX_REDELIVERIES,
"spring-int");
}
public static ActiveMQConnectionFactory buildConnectionFactory(String brokerUrl, Long retryDelay, Integer maxRedeliveries, String clientIdPrefix){
ActiveMQConnectionFactory amqcf = new ActiveMQConnectionFactory();
amqcf.setBrokerURL(brokerUrl);
amqcf.setClientIDPrefix(clientIdPrefix);
if (maxRedeliveries != null) {
if (retryDelay == null) {
retryDelay = 500L;
}
RedeliveryPolicy rp = new org.apache.activemq.RedeliveryPolicy();
rp.setInitialRedeliveryDelay(retryDelay);
rp.setRedeliveryDelay(retryDelay);
rp.setMaximumRedeliveries(maxRedeliveries);
}
return amqcf;
}
我的轮询流程如下:
@Bean
public IntegrationFlow flow2(@Qualifier("spring-int-connection-factory") ConnectionFactory connectionFactory) {
IntegrationFlow flow = IntegrationFlows.from(
Jms.inboundAdapter(connectionFactory)
.configureJmsTemplate(t -> t.receiveTimeout(1000).sessionTransacted(true))
.destination(INPUT_DIRECT_QUEUE),
e -> e.poller(Pollers
.fixedDelay(5000)
.transactional()
.errorChannel("customErrorChannel")
.maxMessagesPerPoll(2))
).handle(this.msgHandler).get();
return flow;
}
我的 errorChannel 处理程序只是重新抛出,这导致 JMS 重新传递发生。
当我 运行 将处理程序设置为始终抛出异常时,我看到消息处理程序实际收到消息 7 次(1 次初始和 6 次重新传递)。
根据我的 connectionFactory 配置,我预计只有 3 次重新传送。
知道是什么原因导致尝试次数加倍以及如何缓解吗?
这对我来说很好用 - 停在 4...
@SpringBootApplication
public class So51792909Application {
private static final Logger logger = LoggerFactory.getLogger(So51792909Application.class);
public static void main(String[] args) {
SpringApplication.run(So51792909Application.class, args);
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
for (int i = 0; i < 1; i++) {
template.convertAndSend("foo", "test");
}
};
}
@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory)
.destination("foo"), e -> e
.poller(Pollers
.fixedDelay(5000)
.transactional()
.maxMessagesPerPoll(2)))
.handle((p, h) -> {
System.out.println(h.get("JMSXDeliveryCount"));
try {
Thread.sleep(2000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
throw new RuntimeException("foo");
})
.get();
}
@Bean
public JmsTransactionManager transactionManager(ConnectionFactory cf) {
return new JmsTransactionManager(cf);
}
@Bean
public ActiveMQConnectionFactory amqCF() {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
RedeliveryPolicy rp = new RedeliveryPolicy();
rp.setMaximumRedeliveries(3);
cf.setRedeliveryPolicy(rp);
return cf;
}
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory(amqCF());
}
@JmsListener(destination = "ActiveMQ.DLQ")
public void listen(String in) {
logger.info(in);
}
}