ConnectionFactory 在共享时抛出错误
ConnectionFactory throwing errors when shared
我有一个非常简单的应用程序,可以将消息添加到队列并使用 MessagerListener 读取它们。
编辑:我在 Artemis 的单个实例上进行了测试,该实例已设置为 docker 上双实例集群的一部分。
我想创建一次 ConnectionFactory 并将其重新用于应用程序中的所有生产者和消费者。
我创建了 ConnectionFactory 并将其存储在静态变量(单例)中,以便可以从任何地方访问它。
目的是客户端在需要时使用此共享连接工厂创建新连接。
但是,我注意到这样做会在尝试创建新连接时导致“无法创建会话工厂”。
javax.jms.JMSException: Failed to create session factory
at org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory.createConnectionInternal(ActiveMQConnectionFactory.java:886)
at org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:299)
at com.test.artemistest.jms.QueueTest2.getMessagesFromQueue(QueueTest2.java:137)
at com.test.artemistest.jms.QueueTest2.access[=10=]0(QueueTest2.java:61)
at com.test.artemistest.jms.QueueTest2.run(QueueTest2.java:75)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: ActiveMQNotConnectedException[errorType=NOT_CONNECTED message=AMQ219007: Cannot connect to server(s). Tried with all available servers.]
at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl.createSessionFactory(ServerLocatorImpl.java:690)
at org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory.createConnectionInternal(ActiveMQConnectionFactory.java:884)
如果我每次调用都创建一个连接工厂,则不会发生此错误。
这样做似乎效率很低。
我在下面重新创建了一个类似的问题。
如果我在 main 方法中创建连接工厂,则会发生错误。
但是,如果在方法中使用之前创建,它会按预期工作。
如果我添加两个侦听器,即使它们在不同的线程中,也会发生错误。是否与连接未在消费者中关闭但在生产者中关闭有关?
为什么会这样,您是否建议共享连接工厂?
谢谢
public class QueueTest2 {
private static boolean shutdown = false;
private static ConnectionFactory cf;
public static void main(String[] args) {
// uncomment below for error to occur
// QueueTest2.getConnectionFactory("localhost", 61616);
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
@Override
public void run() {
getMessagesFromQueue("localhost", 61616);
while (!shutdown) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("getMessagesFromQueue shutdown");
}
});
addMessagesToQueue("localhost", 61616);
// uncommenting below also causes the issue
// executor.execute(new Runnable() {
// @Override
// public void run() {
// getMessagesFromQueue("localhost", 61616);
// while (!shutdown) {
// try {
// Thread.sleep(1000L);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// System.out.println("getMessagesFromQueue shutdown");
// }
// });
addMessagesToQueue("localhost", 61616);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
shutdown = true;
executor.shutdownNow();
}
private static void addMessagesToQueue(String host, int port) {
ConnectionFactory cf2 = getConnectionFactory(host, port);
Connection connection = null;
Session sessionQueue = null;
try {
connection = cf2.createConnection("artemis", "password");
connection.setClientID("Producer");
sessionQueue = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue orderQueue = sessionQueue.createQueue("exampleQueue");
MessageProducer producerQueue = sessionQueue.createProducer(orderQueue);
connection.start();
// send 100 messages
for (int i = 0; i < 100; i++) {
TextMessage message = sessionQueue.createTextMessage("This is an order: " + i);
producerQueue.send(message);
}
} catch (JMSException ex) {
Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
} finally {
try {
if (sessionQueue != null) {
sessionQueue.close();
}
} catch (JMSException ex) {
Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (JMSException ex) {
Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
private static void getMessagesFromQueue(String host, int port) {
ConnectionFactory cf2 = getConnectionFactory(host, port);
Connection connection2 = null;
Session sessionQueue2;
try {
connection2 = cf2.createConnection("artemis", "password");
connection2.setClientID("Consumer2");
sessionQueue2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue orderQueue = sessionQueue2.createQueue("exampleQueue");
MessageConsumer consumerQueue = sessionQueue2.createConsumer(orderQueue);
consumerQueue.setMessageListener(new MessageHandlerTest2());
connection2.start();
Thread.sleep(5000);
} catch (JMSException ex) {
Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
} catch (InterruptedException ex) {
Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static ConnectionFactory getConnectionFactory(String host, int port) {
if (cf == null) {
Map<String, Object> connectionParams2 = new HashMap<String, Object>();
connectionParams2.put(TransportConstants.PORT_PROP_NAME, port);
connectionParams2.put(TransportConstants.HOST_PROP_NAME, host);
TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class
.getName(), connectionParams2);
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
}
return cf;
}
}
class MessageHandlerTest2 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println("new message: " + ((TextMessage) message).getText());
message.acknowledge();
} catch (JMSException ex) {
Logger.getLogger(MessageHandlerTest2.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
我已经 运行 你的代码,但我没有看到任何错误。我的猜测是可能存在与并发相关的时间问题。尝试将 synchronized
添加到您的 getConnectionFactory
方法中,因为理论上它可以被应用程序中的多个线程同时调用,例如:
private synchronized static ConnectionFactory getConnectionFactory(String host, int port)
我找到了一个适用于集群环境和 docker 的解决方案。
它涉及使用“pooled-jms”连接池。我本来打算用的东西。
虽然它没有解释我在上面看到的问题,但至少在我可以进一步调查之前它是一个解决方法。
上面提到的“警告:AMQ212064:无法接收集群拓扑”似乎是一个转移注意力的问题,因为它出现的速度和出现的速度一样快。
我有一个非常简单的应用程序,可以将消息添加到队列并使用 MessagerListener 读取它们。
编辑:我在 Artemis 的单个实例上进行了测试,该实例已设置为 docker 上双实例集群的一部分。
我想创建一次 ConnectionFactory 并将其重新用于应用程序中的所有生产者和消费者。 我创建了 ConnectionFactory 并将其存储在静态变量(单例)中,以便可以从任何地方访问它。 目的是客户端在需要时使用此共享连接工厂创建新连接。 但是,我注意到这样做会在尝试创建新连接时导致“无法创建会话工厂”。
javax.jms.JMSException: Failed to create session factory
at org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory.createConnectionInternal(ActiveMQConnectionFactory.java:886)
at org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:299)
at com.test.artemistest.jms.QueueTest2.getMessagesFromQueue(QueueTest2.java:137)
at com.test.artemistest.jms.QueueTest2.access[=10=]0(QueueTest2.java:61)
at com.test.artemistest.jms.QueueTest2.run(QueueTest2.java:75)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: ActiveMQNotConnectedException[errorType=NOT_CONNECTED message=AMQ219007: Cannot connect to server(s). Tried with all available servers.]
at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl.createSessionFactory(ServerLocatorImpl.java:690)
at org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory.createConnectionInternal(ActiveMQConnectionFactory.java:884)
如果我每次调用都创建一个连接工厂,则不会发生此错误。 这样做似乎效率很低。
我在下面重新创建了一个类似的问题。 如果我在 main 方法中创建连接工厂,则会发生错误。 但是,如果在方法中使用之前创建,它会按预期工作。
如果我添加两个侦听器,即使它们在不同的线程中,也会发生错误。是否与连接未在消费者中关闭但在生产者中关闭有关?
为什么会这样,您是否建议共享连接工厂?
谢谢
public class QueueTest2 {
private static boolean shutdown = false;
private static ConnectionFactory cf;
public static void main(String[] args) {
// uncomment below for error to occur
// QueueTest2.getConnectionFactory("localhost", 61616);
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
@Override
public void run() {
getMessagesFromQueue("localhost", 61616);
while (!shutdown) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("getMessagesFromQueue shutdown");
}
});
addMessagesToQueue("localhost", 61616);
// uncommenting below also causes the issue
// executor.execute(new Runnable() {
// @Override
// public void run() {
// getMessagesFromQueue("localhost", 61616);
// while (!shutdown) {
// try {
// Thread.sleep(1000L);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// System.out.println("getMessagesFromQueue shutdown");
// }
// });
addMessagesToQueue("localhost", 61616);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
shutdown = true;
executor.shutdownNow();
}
private static void addMessagesToQueue(String host, int port) {
ConnectionFactory cf2 = getConnectionFactory(host, port);
Connection connection = null;
Session sessionQueue = null;
try {
connection = cf2.createConnection("artemis", "password");
connection.setClientID("Producer");
sessionQueue = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue orderQueue = sessionQueue.createQueue("exampleQueue");
MessageProducer producerQueue = sessionQueue.createProducer(orderQueue);
connection.start();
// send 100 messages
for (int i = 0; i < 100; i++) {
TextMessage message = sessionQueue.createTextMessage("This is an order: " + i);
producerQueue.send(message);
}
} catch (JMSException ex) {
Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
} finally {
try {
if (sessionQueue != null) {
sessionQueue.close();
}
} catch (JMSException ex) {
Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (JMSException ex) {
Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
private static void getMessagesFromQueue(String host, int port) {
ConnectionFactory cf2 = getConnectionFactory(host, port);
Connection connection2 = null;
Session sessionQueue2;
try {
connection2 = cf2.createConnection("artemis", "password");
connection2.setClientID("Consumer2");
sessionQueue2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue orderQueue = sessionQueue2.createQueue("exampleQueue");
MessageConsumer consumerQueue = sessionQueue2.createConsumer(orderQueue);
consumerQueue.setMessageListener(new MessageHandlerTest2());
connection2.start();
Thread.sleep(5000);
} catch (JMSException ex) {
Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
} catch (InterruptedException ex) {
Logger.getLogger(QueueTest2.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static ConnectionFactory getConnectionFactory(String host, int port) {
if (cf == null) {
Map<String, Object> connectionParams2 = new HashMap<String, Object>();
connectionParams2.put(TransportConstants.PORT_PROP_NAME, port);
connectionParams2.put(TransportConstants.HOST_PROP_NAME, host);
TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class
.getName(), connectionParams2);
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
}
return cf;
}
}
class MessageHandlerTest2 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println("new message: " + ((TextMessage) message).getText());
message.acknowledge();
} catch (JMSException ex) {
Logger.getLogger(MessageHandlerTest2.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
我已经 运行 你的代码,但我没有看到任何错误。我的猜测是可能存在与并发相关的时间问题。尝试将 synchronized
添加到您的 getConnectionFactory
方法中,因为理论上它可以被应用程序中的多个线程同时调用,例如:
private synchronized static ConnectionFactory getConnectionFactory(String host, int port)
我找到了一个适用于集群环境和 docker 的解决方案。 它涉及使用“pooled-jms”连接池。我本来打算用的东西。
虽然它没有解释我在上面看到的问题,但至少在我可以进一步调查之前它是一个解决方法。
上面提到的“警告:AMQ212064:无法接收集群拓扑”似乎是一个转移注意力的问题,因为它出现的速度和出现的速度一样快。