ConnectionFactory 在共享时抛出错误

ConnectionFactory throwing errors when shared

我有一个非常简单的应用程序,可以将消息添加到队列并使用 MessagerListener 读取它们。

编辑:我在 Artemis 的单个实例上进行了测试,该实例已设置为 docker 上双实例集群的一部分。

我想创建一次 ConnectionFactory 并将其重新用于应用程序中的所有生产者和消费者。 我创建了 ConnectionFactory 并将其存储在静态变量(单例)中,以便可以从任何地方访问它。 目的是客户端在需要时使用此共享连接工厂创建新连接。 但是,我注意到这样做会在尝试创建新连接时导致“无法创建会话工厂”。

javax.jms.JMSException: Failed to create session factory
    at org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory.createConnectionInternal(ActiveMQConnectionFactory.java:886)
    at org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:299)
    at com.test.artemistest.jms.QueueTest2.getMessagesFromQueue(QueueTest2.java:137)
    at com.test.artemistest.jms.QueueTest2.access[=10=]0(QueueTest2.java:61)
    at com.test.artemistest.jms.QueueTest2.run(QueueTest2.java:75)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: ActiveMQNotConnectedException[errorType=NOT_CONNECTED message=AMQ219007: Cannot connect to server(s). Tried with all available servers.]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl.createSessionFactory(ServerLocatorImpl.java:690)
    at org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory.createConnectionInternal(ActiveMQConnectionFactory.java:884)

如果我每次调用都创建一个连接工厂,则不会发生此错误。 这样做似乎效率很低。

我在下面重新创建了一个类似的问题。 如果我在 main 方法中创建连接工厂,则会发生错误。 但是,如果在方法中使用之前创建,它会按预期工作。

如果我添加两个侦听器,即使它们在不同的线程中,也会发生错误。是否与连接未在消费者中关闭但在生产者中关闭有关?

为什么会这样,您是否建议共享连接工厂?

谢谢

public class QueueTest2 {

    private static boolean shutdown = false;

    private static ConnectionFactory cf;

    public static void main(String[] args) {

        // uncomment below for error to occur
//        QueueTest2.getConnectionFactory("localhost", 61616);
        ExecutorService executor = Executors.newCachedThreadPool();

        executor.execute(new Runnable() {
            @Override
            public void run() {
                getMessagesFromQueue("localhost", 61616);
                while (!shutdown) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("getMessagesFromQueue shutdown");
            }
        });

        addMessagesToQueue("localhost", 61616);

       // uncommenting below also causes the issue
//        executor.execute(new Runnable() {
//            @Override
//            public void run() {
//                getMessagesFromQueue("localhost", 61616);
//                while (!shutdown) {
//                    try {
//                        Thread.sleep(1000L);
//                    } catch (InterruptedException e) {
//                        e.printStackTrace();
//                    }
//                }
//                System.out.println("getMessagesFromQueue shutdown");
//            }
//        });
        

        addMessagesToQueue("localhost", 61616);

        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        shutdown = true;
        executor.shutdownNow();
    }

    private static void addMessagesToQueue(String host, int port) {

        ConnectionFactory cf2 = getConnectionFactory(host, port);

        Connection connection = null;
        Session sessionQueue = null;

        try {

            connection = cf2.createConnection("artemis", "password");
            connection.setClientID("Producer");

            sessionQueue = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Queue orderQueue = sessionQueue.createQueue("exampleQueue");
            MessageProducer producerQueue = sessionQueue.createProducer(orderQueue);
            connection.start();

            // send 100 messages
            for (int i = 0; i < 100; i++) {
                TextMessage message = sessionQueue.createTextMessage("This is an order: " + i);
                producerQueue.send(message);
            }

        } catch (JMSException ex) {
            Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            try {
                if (sessionQueue != null) {
                    sessionQueue.close();
                }
            } catch (JMSException ex) {
                Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
            }
            try {
                if (connection != null) {
                    connection.close();
                }
            } catch (JMSException ex) {
                Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

    }

    private static void getMessagesFromQueue(String host, int port) {
        ConnectionFactory cf2 = getConnectionFactory(host, port);
        Connection connection2 = null;

        Session sessionQueue2;

        try {

            connection2 = cf2.createConnection("artemis", "password");
            connection2.setClientID("Consumer2");

            sessionQueue2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Queue orderQueue = sessionQueue2.createQueue("exampleQueue");

            MessageConsumer consumerQueue = sessionQueue2.createConsumer(orderQueue);
            consumerQueue.setMessageListener(new MessageHandlerTest2());

            connection2.start();

            Thread.sleep(5000);

        } catch (JMSException ex) {
            Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
        } catch (InterruptedException ex) {
            Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
        }

    }

    private static ConnectionFactory getConnectionFactory(String host, int port) {
        if (cf == null) {
            Map<String, Object> connectionParams2 = new HashMap<String, Object>();

            connectionParams2.put(TransportConstants.PORT_PROP_NAME, port);
            connectionParams2.put(TransportConstants.HOST_PROP_NAME, host);
            TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class
                    .getName(), connectionParams2);
            cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
        }
        return cf;
    }

}

class MessageHandlerTest2 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("new message: " + ((TextMessage) message).getText());

            message.acknowledge();
        } catch (JMSException ex) {
            Logger.getLogger(MessageHandlerTest2.class.getName()).log(Level.SEVERE, null, ex);
        }

    }
}

我已经 运行 你的代码,但我没有看到任何错误。我的猜测是可能存在与并发相关的时间问题。尝试将 synchronized 添加到您的 getConnectionFactory 方法中,因为理论上它可以被应用程序中的多个线程同时调用,例如:

private synchronized static ConnectionFactory getConnectionFactory(String host, int port)

我找到了一个适用于集群环境和 docker 的解决方案。 它涉及使用“pooled-jms”连接池。我本来打算用的东西。

虽然它没有解释我在上面看到的问题,但至少在我可以进一步调查之前它是一个解决方法。

上面提到的“警告:AMQ212064:无法接收集群拓扑”似乎是一个转移注意力的问题,因为它出现的速度和出现的速度一样快。