SessionAwareMessageListener 中的 rollback() 不起作用

Doesn't work rollback() in SessionAwareMessageListener

即使消息被MessageListener接收到,我也不想从Queue中移除,我想在onMessage方法中做一些处理并根据结果:

我想 commit(); 成功 - 这样消息将从队列中完全删除。

对于失败 - 不要提交 - rollback(); 因此消息将被重新传送(默认情况下有时),然后进入死信队列 (DLQ)。这对我们来说没问题。

我使用:SpringBoot 和 hornetq (spring-boot-starter-hornetq-1.4.7.RELEASE)。 设置:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jndi.JndiObjectFactoryBean;
import org.springframework.jndi.JndiTemplate;

import javax.jms.ConnectionFactory;
import javax.naming.Context;
import javax.naming.NamingException;
import java.util.Properties;

import static com.test.hornetq.Receiver.LOG;
import static javax.jms.Session.SESSION_TRANSACTED;

@Configuration
public class JmsConfig {

    private String host;
    private String port;
    private String connectionFactoryJndiName;
    private String jndiInit;
    private String user;
    private String password;
    private String jmsReceiverConcurrency;

    public JmsConfig(final Environment env) {
        host = env.getProperty("host");
        port = env.getProperty("port");
        connectionFactoryJndiName = env.getProperty("connectionfactory.jndiname");
        jndiInit = env.getProperty("jndiInit");
        user = env.getProperty("user");
        password = env.getProperty("password");
        jmsReceiverConcurrency = env.getProperty("jmsReceiverConcurrency");
    }

    @Bean
    public JndiTemplate jndiTemplate() {
        final JndiTemplate jndiTemplate = new JndiTemplate();
        jndiTemplate.setEnvironment(getProperties());

        return jndiTemplate;
    }

    @Bean
    public JndiObjectFactoryBean jmsConnectionFactory() throws NamingException {
        final JndiObjectFactoryBean jndiObjectFactoryBean = new JndiObjectFactoryBean();
        jndiObjectFactoryBean.setJndiTemplate(jndiTemplate());
        jndiObjectFactoryBean.setJndiName(connectionFactoryJndiName);
        jndiObjectFactoryBean.afterPropertiesSet();

        return jndiObjectFactoryBean;
    }

    @Bean
    @Primary
    public ConnectionFactory connectionFactory() throws NamingException {
        final UserCredentialsConnectionFactoryAdapter adapter = new UserCredentialsConnectionFactoryAdapter();
        adapter.setTargetConnectionFactory((ConnectionFactory) jmsConnectionFactory().getObject());
        adapter.setUsername(user);
        adapter.setPassword(password);

        return adapter;
    }

    @Bean
    public JmsListenerContainerFactory<?> myJmsContainerFactory() throws NamingException {
        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setSubscriptionDurable(false);
        factory.setConcurrency(jmsReceiverConcurrency);
        factory.setMaxMessagesPerTask(1);
        factory.setSessionTransacted(true);
        factory.setSessionAcknowledgeMode(SESSION_TRANSACTED);
        factory.setErrorHandler(t -> {
            LOG.error("Error in listener!", t);
        });


        return factory;
    }

    private Properties getProperties() {
        final Properties jndiProps = new Properties();
        jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, jndiInit);
        jndiProps.setProperty(Context.PROVIDER_URL, "http-remoting://" + host + ":" + port);
        jndiProps.setProperty(Context.SECURITY_PRINCIPAL, user);
        jndiProps.setProperty(Context.SECURITY_CREDENTIALS, password);
        return jndiProps;
    }

}

接收者:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Session;

@Component
public class Receiver {

    @JmsListener(destination = "${destination.name}", containerFactory = "myJmsContainerFactory")
    public void onReceive(final MapMessage message, Session session) throws JMSException {
        try {
            System.out.println(">>>> " + message);
            session.rollback();
        } catch (Exception ex) {
            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>THROW ");
            throw ex;
        }
    }
}

但是当我这样做时 rollback(); 没有任何反应,消息也没有返回。

代码有效。问题出在服务器端的 hornetq 设置中。

<pre-acknowledge>true</pre-acknowledge>

Extra Acknowledge Modes

Please note, that if you use pre-acknowledge mode, then you will lose transactional semantics for messages being consumed, since clearly they are being acknowledged first on the server, not when you commit the transaction. This may be stating the obvious but we like to be clear on these things to avoid confusion!