在 jboss EAP 7 中创建 JMS 消息生产者时出错

Error while Creating JMS Message Producer in jboss EAP 7

我在 JBOSS_EAP_7.0 中配置了 JMS 主题并编写了一个简单的 java 代码来创建消息生产者。我有以下无状态 bean

@Stateless
public class ExchangeSenderFacadeWrapperBean {


    private static final OMSLogHandlerI logger = new Log4j2Handler("ClientSenderFacadeBean");
    @Resource(lookup = "java:/JmsXA")     // inject ConnectionFactory (more)
    protected ConnectionFactory  factory;


    @Resource(lookup = "java:/jms/topic/ORD_CLINT_PUSH")
    protected Topic target;

    private Connection  connection = null;
    private Session session = null;



    public void sendMessage(String message) {

        MessageProducer producer= null;
        try {
            if(connection==null){  //todo verify
                connection = factory.createConnection();
            }
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(target);
            producer.setDisableMessageID(true);
            TextMessage outmsg = session.createTextMessage(message);
            producer.send(outmsg);
            logger.info("Message was sent to Topic");
            producer.setTimeToLive(900000);//15min  //todo
        } catch (Exception e) {
            logger.error(" Error when sending order to jboss:", e);
            throw new OMSCoreRuntimeException(e.getMessage(), e);
        } finally {
            try {
                if (producer != null)
                    producer.close();
            } catch (JMSException e) {
                logger.warn("\n jms producer close error:",e);
            }
            try {
                if (session != null)
                    session.close();
            } catch (JMSException e) {
                logger.warn("\n jms session close error:",e);
            }
        }
    }

这工作正常,直到我做了简单的更改,将 sendMessage(String message) 方法移动到 pojo class,如下所示。

@Stateless(name = "ExchangeSenderFacadeBean")
@Local({ExchangeSenderFacadeLocalI.class})
public class ExchangeSenderFacadeWrapperBean implements ExchangeSenderFacadeLocalI {
    @Resource(lookup = "java:/JmsXA")     // inject ConnectionFactory (more)
    protected ConnectionFactory factory;

    @EJB(beanName = "BeanRegistryLoader")
    protected BeanRegistryLoader omsRegistryBean;

    protected BeanRegistryCore beanRegistryCore;

    @Resource(lookup = "java:/jms/queue/ToExchange")
    protected Queue target;

    private ExchangeSenderFacadeCoreI exchangeSenderFacadeCore;


    @Override
    public void sendToExchange(ExchangeMessage exchangeMessage) {
        exchangeSenderFacadeCore.sendToExchange(exchangeMessage);

    }

    @PostConstruct
    public void init() {
        beanRegistryCore = omsRegistryBean.registry();
        if (exchangeSenderFacadeCore == null) {
            exchangeSenderFacadeCore = ((BeanRegistryCore) omsRegistryBean.registry()).getExchangeSenderFacadeCoreI();
            exchangeSenderFacadeCore.setBeanRegistryCore(omsRegistryBean.registry());
            exchangeSenderFacadeCore.setFactory(factory);
            exchangeSenderFacadeCore.setTargetQueue(target);
        }
    }

}

ConnectionFactory 和目标 Queue 变量在 EJB PostConstruct 方法和 pojo class 中设置如下,现在包含创建和发布方法到 EJB 队列的逻辑

public class ExchangeSenderFacadeCore implements ExchangeSenderFacadeCoreI {
    private static final OMSLogHandlerI logger = new Log4j2HndlAdaptor("ExchangeSenderFacadeCore");
    private BeanRegistryCore beanRegistryCore;
    private ConnectionFactory factory;
    private Connection connection = null;
    private Session session = null;
    private long ttl = 900000;
    protected Queue targetQueue;

    public ExchangeSenderFacadeCore() {
        if (System.getProperty(OMSConst.SYS_PROPERTY_JMS_TTL) != null && System.getProperty(OMSConst.SYS_PROPERTY_JMS_TTL).length() > 0) {
            ttl = Long.parseLong(System.getProperty(OMSConst.SYS_PROPERTY_JMS_TTL));
        }
        logger.info("LN:103", "==JMS Topic TTL:" + ttl);
    }

    @Override
    public void processSendToExchange(ExchangeMessage exchangeMessage) {
        sendToExchange(exchangeMessage);
    }

    public boolean isParallelRunEnabled() {
        Object isParallelRun = beanRegistryCore.getCacheAdaptorI().cacheGet(OMSConst.DEFAULT_TENANCY_CODE, OMSConst.APP_PARAM_IS_PARALLEL_RUN, CACHE_NAMES.SYS_PARAMS_CACHE_CORE);
        if (isParallelRun != null && String.valueOf(isParallelRun).equals(OMSConst.STRING_1)) {
            return true;
        }
        return false;
    }

    @Override
    public void sendToExchange(ExchangeMessage exchangeMessage) {
        MessageProducer producer = null;
        try {
            if (isParallelRunEnabled()) {
                logger.info("LN:66", "== Message send to exchange skipped,due to parallel run enabled");
                return;
            }
            if (connection == null) {
                connection = factory.createConnection();
            }
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(targetQueue);
            producer.setDisableMessageID(true);
            Message message = beanRegistryCore.getJmsExchangeMsgTransformerI().transformToJMSMessage(session, exchangeMessage);
            producer.send(message);
            producer.setTimeToLive(ttl);//default 15min
            logger.elkLog("78", "-1", LogEventsEnum.SENT_TO_EXCHANGE, exchangeMessage.toString());
        } catch (Exception e) {
            logger.error("LN:80", " Error when sending order to exchange:", e);
            throw new OMSCoreRuntimeException(e.getMessage(), e);
        } finally {
            try {
                if (producer != null)
                    producer.close();
            } catch (JMSException e) {
                logger.error("LN:87", "JMS producer close error:", e);
            }
            try {
                if (session != null)
                    session.close();
            } catch (JMSException e) {
                logger.error("LN:93", "JMS session close error:", e);
            }
        }
    }

    @Override
    public void processSendToExchangeSync(ExchangeMessage exchangeMessage) {

    }

    @Override
    public BeanRegistryCore getBeanRegistryCore() {
        return beanRegistryCore;
    }

    @Override
    public void setBeanRegistryCore(BeanRegistryCore beanRegistryCore) {
        this.beanRegistryCore = beanRegistryCore;
    }

    @Override
    public ConnectionFactory getFactory() {
        return factory;
    }

    @Override
    public void setFactory(ConnectionFactory factory) {
        this.factory = factory;
    }

    @Override
    public Queue getTargetQueue() {
        return targetQueue;
    }

    @Override
    public void setTargetQueue(Queue targetQueue) {
        this.targetQueue = targetQueue;
    }
}

但是当我执行审核代码时出现以下错误

javax.ejb.EJBTransactionRolledbackException: Producer is closed

任何可能的修复方法?

在对问题进行深入搜索后,我发现这篇 https://developer.jboss.org/wiki/ShouldICacheJMSConnectionsAndJMSSessions 文章发布在 JBOSS 开发人员线程之一上。这清楚地解释了缓存连接和其他 JMS 相关资源成为 JMS 代码反模式的原因是 运行 在 JEE 应用程序服务器中。

简而言之,JCA 层汇集了 JMS 连接和 JMS 会话。因此,当您调用 createConnection() 或 createSession() 时,那么,在大多数情况下,它并不是真正调用实际的 JMS 实现来实际创建一个新的 JMS 连接或 JMS 会话,它只是从它自己的内部缓存中返回一个。

此外 JBOSS 服务器也管理无状态会话 bean 池。无状态会话 bean 仅在您完成其用途后才可在连接池中使用,而不是在此之前。用于在无状态会话 bean 中创建 JMS 会话 (session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) 的同时连接(JMS 新创建或缓存),也完成了它的目的并且在 JCA 层连接上也可用水池。因此,如下调用无状态 EJB class 中的缓存连接不会给您异常,即使 Oracle 不推荐这样做。

public void sendToExchange(ExchangeMessage exchangeMessage) {
        MessageProducer producer = null;
        try {
            if (isParallelRunEnabled()) {
                logger.info("LN:66", "== Message send to exchange skipped,due to parallel run enabled");
                return;
            }
            if (connection == null) {
                connection = factory.createConnection();
            }
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(targetQueue);
            producer.setDisableMessageID(true);
            Message message = beanRegistryCore.getJmsExchangeMsgTransformerI().transformToJMSMessage(session, exchangeMessage);
            producer.send(message);
            producer.setTimeToLive(ttl);//default 15min
            logger.elkLog("78", "-1", LogEventsEnum.SENT_TO_EXCHANGE, exchangeMessage.toString());
        } catch (Exception e) {
            logger.error("LN:80", " Error when sending order to exchange:", e);
            throw new OMSCoreRuntimeException(e.getMessage(), e);
        } finally {
            try {
                if (producer != null)
                    producer.close();
            } catch (JMSException e) {
                logger.error("LN:87", "JMS producer close error:", e);
            }
            try {
                if (session != null)
                    session.close();
            } catch (JMSException e) {
                logger.error("LN:93", "JMS session close error:", e);
            }
        }
    }

但在这种情况下,因为同一个 POJO class 实例可以在多个场合使用,如下所示。不保证连接在JCA层连接池中被释放可用,并给出异常。

@PostConstruct
    public void init() {
        beanRegistryCore = omsRegistryBean.registry();
        if (exchangeSenderFacadeCore == null) {
            exchangeSenderFacadeCore = ((BeanRegistryCore) omsRegistryBean.registry()).getExchangeSenderFacadeCoreI();
            exchangeSenderFacadeCore.setBeanRegistryCore(omsRegistryBean.registry());
            exchangeSenderFacadeCore.setFactory(factory);
            exchangeSenderFacadeCore.setTargetQueue(target);
        }
    }