spring 启动失败时未重试 SQS JMS 消息

SQS JMS Message not being retried on a failure in spring boot

给定这样的 JMS/SQS 配置:

private final SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(
        new ProviderConfiguration().withNumberOfMessagesToPrefetch(10),
        AmazonSQSClientBuilder.defaultClient());

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(this.sqsConnectionFactory);
    factory.setDestinationResolver(new DynamicDestinationResolver());
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

和这样的接收器:

@Service
public class HandleMessage {
    @Transactional
    @JmsListener(destination = "${sqs.handler}")
    public void receive(String message) throws IOException, JMSException {
        ...
        if (message.contains("test"))
          throw new JMSException("boom!");
        ...
    }

我发现所有消息都在处理中,包含测试的消息消失了,而不是重试。在 SQS 配置中是否可能需要更改某些内容。

@Transactional 属性可能需要也可能不需要,但我想要的是 spring 引导向 SQS 发出消息在出现异常时失败的信号,我确信这是可能。

Maximum message size 256 KB
Last updated 5/25/2020, 12:00:10
Message retention period 4 Days
Default visibility timeout 30 Seconds
Messages available 0
Delivery delay 0 Seconds
Messages in flight (not available to other consumers) 0
Receive message wait time 0 Seconds
Messages delayed 0
Content-based deduplication -

我希望这能回答您的问题,或者至少让您朝着正确的方向前进。

您需要将 JTA 事务管理器部署为 Spring PlatformTransactionManager。有很多可用的,我用 Arjuna 取得了很好的效果。添加启动器:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-narayana</artifactId>
    </dependency>

将部署 Arjuna 并将其配置为 PlatformTransactionManager。

有用的日志记录:

<logger name="com.arjuna" level="TRACE" additivity="false">
    <appender-ref ref="STDOUT" />
</logger>

删除:

factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

并添加:

factory.setSessionTransacted(true);

当消息侦听器容器开始侦听消息时,它应该创建一个事务上下文。当您到达事务方法时,该方法应该加入事务上下文。如果该方法的提交成功,那么您应该会在 Arjuna 日志记录中看到消息 receive get committed。如果出现异常,则侦听器事务将回滚,您的消息将返回到您的队列。

只是为了搅浑水....您只有一个事务资源 - JMS 会话。您可以避免使用完整的 JTA 事务管理器。我个人认为它们并不过分占用资源。您可以创建一个 JmsTransactionManager,并将您的连接工厂传递给它。然后,您可以将消息侦听器容器工厂上的 JmsTransactionManager 设置为事务管理器。

我居然解决了这个问题,尴尬
不过,我真的很喜欢上面答案中的所有建议,并将其标记为正确,因为在某些情况下参与交易会带来巨大的好处。

但是对于那些可能偶然发现这个问题的人来说,一个温馨的提醒是确保地球上的某个地方没有另一个服务器(在我的例子中是不同的大陆) 正在使用失败的消息。

一旦我确定我是这个星球上唯一听消息的机器,我就得到了我期望收到的所有重试。