spring 启动失败时未重试 SQS JMS 消息
SQS JMS Message not being retried on a failure in spring boot
给定这样的 JMS/SQS 配置:
private final SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(
new ProviderConfiguration().withNumberOfMessagesToPrefetch(10),
AmazonSQSClientBuilder.defaultClient());
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(this.sqsConnectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
和这样的接收器:
@Service
public class HandleMessage {
@Transactional
@JmsListener(destination = "${sqs.handler}")
public void receive(String message) throws IOException, JMSException {
...
if (message.contains("test"))
throw new JMSException("boom!");
...
}
我发现所有消息都在处理中,包含测试的消息消失了,而不是重试。在 SQS 配置中是否可能需要更改某些内容。
@Transactional
属性可能需要也可能不需要,但我想要的是 spring 引导向 SQS 发出消息在出现异常时失败的信号,我确信这是可能。
Maximum message size 256 KB
Last updated 5/25/2020, 12:00:10
Message retention period 4 Days
Default visibility timeout 30 Seconds
Messages available 0
Delivery delay 0 Seconds
Messages in flight (not available to other consumers) 0
Receive message wait time 0 Seconds
Messages delayed 0
Content-based deduplication -
我希望这能回答您的问题,或者至少让您朝着正确的方向前进。
您需要将 JTA 事务管理器部署为 Spring PlatformTransactionManager。有很多可用的,我用 Arjuna 取得了很好的效果。添加启动器:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-narayana</artifactId>
</dependency>
将部署 Arjuna 并将其配置为 PlatformTransactionManager。
有用的日志记录:
<logger name="com.arjuna" level="TRACE" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
删除:
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
并添加:
factory.setSessionTransacted(true);
当消息侦听器容器开始侦听消息时,它应该创建一个事务上下文。当您到达事务方法时,该方法应该加入事务上下文。如果该方法的提交成功,那么您应该会在 Arjuna 日志记录中看到消息 receive get committed。如果出现异常,则侦听器事务将回滚,您的消息将返回到您的队列。
只是为了搅浑水....您只有一个事务资源 - JMS 会话。您可以避免使用完整的 JTA 事务管理器。我个人认为它们并不过分占用资源。您可以创建一个 JmsTransactionManager,并将您的连接工厂传递给它。然后,您可以将消息侦听器容器工厂上的 JmsTransactionManager 设置为事务管理器。
我居然解决了这个问题,尴尬。
不过,我真的很喜欢上面答案中的所有建议,并将其标记为正确,因为在某些情况下参与交易会带来巨大的好处。
但是对于那些可能偶然发现这个问题的人来说,一个温馨的提醒是确保地球上的某个地方没有另一个服务器(在我的例子中是不同的大陆) 正在使用失败的消息。
一旦我确定我是这个星球上唯一听消息的机器,我就得到了我期望收到的所有重试。
给定这样的 JMS/SQS 配置:
private final SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(
new ProviderConfiguration().withNumberOfMessagesToPrefetch(10),
AmazonSQSClientBuilder.defaultClient());
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(this.sqsConnectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
和这样的接收器:
@Service
public class HandleMessage {
@Transactional
@JmsListener(destination = "${sqs.handler}")
public void receive(String message) throws IOException, JMSException {
...
if (message.contains("test"))
throw new JMSException("boom!");
...
}
我发现所有消息都在处理中,包含测试的消息消失了,而不是重试。在 SQS 配置中是否可能需要更改某些内容。
@Transactional
属性可能需要也可能不需要,但我想要的是 spring 引导向 SQS 发出消息在出现异常时失败的信号,我确信这是可能。
Maximum message size 256 KB
Last updated 5/25/2020, 12:00:10
Message retention period 4 Days
Default visibility timeout 30 Seconds
Messages available 0
Delivery delay 0 Seconds
Messages in flight (not available to other consumers) 0
Receive message wait time 0 Seconds
Messages delayed 0
Content-based deduplication -
我希望这能回答您的问题,或者至少让您朝着正确的方向前进。
您需要将 JTA 事务管理器部署为 Spring PlatformTransactionManager。有很多可用的,我用 Arjuna 取得了很好的效果。添加启动器:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-narayana</artifactId>
</dependency>
将部署 Arjuna 并将其配置为 PlatformTransactionManager。
有用的日志记录:
<logger name="com.arjuna" level="TRACE" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
删除:
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
并添加:
factory.setSessionTransacted(true);
当消息侦听器容器开始侦听消息时,它应该创建一个事务上下文。当您到达事务方法时,该方法应该加入事务上下文。如果该方法的提交成功,那么您应该会在 Arjuna 日志记录中看到消息 receive get committed。如果出现异常,则侦听器事务将回滚,您的消息将返回到您的队列。
只是为了搅浑水....您只有一个事务资源 - JMS 会话。您可以避免使用完整的 JTA 事务管理器。我个人认为它们并不过分占用资源。您可以创建一个 JmsTransactionManager,并将您的连接工厂传递给它。然后,您可以将消息侦听器容器工厂上的 JmsTransactionManager 设置为事务管理器。
我居然解决了这个问题,尴尬。
不过,我真的很喜欢上面答案中的所有建议,并将其标记为正确,因为在某些情况下参与交易会带来巨大的好处。
但是对于那些可能偶然发现这个问题的人来说,一个温馨的提醒是确保地球上的某个地方没有另一个服务器(在我的例子中是不同的大陆) 正在使用失败的消息。
一旦我确定我是这个星球上唯一听消息的机器,我就得到了我期望收到的所有重试。