如何在 ActiveMQ 上配置 JmsListener 以使用 Qpid Sender 进行自动缩放
How to configure JmsListener on ActiveMQ for autoscaling using Qpid Sender
我有一个带有 activeMQ Artemis 队列的 kubernetes 集群,我正在使用 hpa 来自动缩放微服务。消息通过 QpidSender 发送并通过 JMSListener 接收。
消息传递有效,但我无法以某种方式配置 Queue/Listener,自动缩放按预期工作。
这是我的 Qpid 发件人
public static void send(String avroMessage, String task) throws JMSException, NamingException {
Connection connection = createConnection();
connection.start();
Session session = createSession(connection);
MessageProducer messageProducer = createProducer(session);
TextMessage message = session.createTextMessage(avroMessage);
message.setStringProperty("task", task);
messageProducer.send(
message,
DeliveryMode.NON_PERSISTENT,
Message.DEFAULT_PRIORITY,
Message.DEFAULT_TIME_TO_LIVE);
connection.close();
}
private static MessageProducer createProducer(Session session) throws JMSException {
Destination producerDestination =
session.createQueue("queue?consumer.prefetchSize=1&heartbeat='10000'");
return session.createProducer(producerDestination);
}
private static Session createSession(Connection connection) throws JMSException {
return connection.createSession(Session.AUTO_ACKNOWLEDGE);
}
private static Connection createConnection() throws NamingException, JMSException {
Hashtable<Object, Object> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
env.put("connectionfactory.factoryLookup", amqUrl);
Context context = new javax.naming.InitialContext(env);
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(10);
return connectionFactory.createConnection(amqUsername, amqPassword);
}
这是我的监听器配置
@Bean
public JmsConnectionFactory jmsConnection() {
JmsConnectionFactory jmsConnection = new JmsConnectionFactory();
jmsConnection.setRemoteURI(this.amqUrl);
jmsConnection.setUsername(this.amqUsername);
jmsConnection.setPassword(this.amqPassword);
return jmsConnection;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(jmsConnection());
return factory;
}
这是我的听众
@JmsListener(
destination = "queue?consumer.prefetchSize=1&heartbeat='10000'",
selector = "task = 'myTask'"
)
public void receiveMsg(Message message) throws IOException, JMSException {
message.acknowledge();
doStuff();
}
我这样发消息
QpidSender.send(avroMessage, "myTask");
此设置有效。我可以发送不同的消息,一旦超过 2 个,我的服务的第二个实例就会启动并使用消息。如果稍后消息计数低于 2,则服务终止。
问题是:我不想在 doStuff() 之前确认消息。因为如果出现问题或服务在完成 doStuff() 之前终止,消息将丢失(对吗?)。
但如果我将它重新排序为
doStuff();
message.acknowledge();
第二个实例无法从代理接收消息,只要第一个服务仍在 doStuff()
中并且尚未确认消息。
如果服务终止或其他原因在 doStuff()
上失败,我如何以一种方式配置它,多个实例可以使用队列中的一条消息,但消息不会丢失?
使用factory.setSessionTransacted(true)
.
请参阅 DefaultMessageListenerContainer
的 javadoc:
* <p><b>It is strongly recommended to either set {@link #setSessionTransacted
* "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
* "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
* javadoc for details on acknowledge modes and native transaction options, as
* well as the {@link AbstractPollingMessageListenerContainer} javadoc for details
* on configuring an external transaction manager. Note that for the default
* "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment
* before listener execution, with no redelivery in case of an exception.
我有一个带有 activeMQ Artemis 队列的 kubernetes 集群,我正在使用 hpa 来自动缩放微服务。消息通过 QpidSender 发送并通过 JMSListener 接收。
消息传递有效,但我无法以某种方式配置 Queue/Listener,自动缩放按预期工作。
这是我的 Qpid 发件人
public static void send(String avroMessage, String task) throws JMSException, NamingException {
Connection connection = createConnection();
connection.start();
Session session = createSession(connection);
MessageProducer messageProducer = createProducer(session);
TextMessage message = session.createTextMessage(avroMessage);
message.setStringProperty("task", task);
messageProducer.send(
message,
DeliveryMode.NON_PERSISTENT,
Message.DEFAULT_PRIORITY,
Message.DEFAULT_TIME_TO_LIVE);
connection.close();
}
private static MessageProducer createProducer(Session session) throws JMSException {
Destination producerDestination =
session.createQueue("queue?consumer.prefetchSize=1&heartbeat='10000'");
return session.createProducer(producerDestination);
}
private static Session createSession(Connection connection) throws JMSException {
return connection.createSession(Session.AUTO_ACKNOWLEDGE);
}
private static Connection createConnection() throws NamingException, JMSException {
Hashtable<Object, Object> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
env.put("connectionfactory.factoryLookup", amqUrl);
Context context = new javax.naming.InitialContext(env);
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(10);
return connectionFactory.createConnection(amqUsername, amqPassword);
}
这是我的监听器配置
@Bean
public JmsConnectionFactory jmsConnection() {
JmsConnectionFactory jmsConnection = new JmsConnectionFactory();
jmsConnection.setRemoteURI(this.amqUrl);
jmsConnection.setUsername(this.amqUsername);
jmsConnection.setPassword(this.amqPassword);
return jmsConnection;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(jmsConnection());
return factory;
}
这是我的听众
@JmsListener(
destination = "queue?consumer.prefetchSize=1&heartbeat='10000'",
selector = "task = 'myTask'"
)
public void receiveMsg(Message message) throws IOException, JMSException {
message.acknowledge();
doStuff();
}
我这样发消息
QpidSender.send(avroMessage, "myTask");
此设置有效。我可以发送不同的消息,一旦超过 2 个,我的服务的第二个实例就会启动并使用消息。如果稍后消息计数低于 2,则服务终止。
问题是:我不想在 doStuff() 之前确认消息。因为如果出现问题或服务在完成 doStuff() 之前终止,消息将丢失(对吗?)。
但如果我将它重新排序为
doStuff();
message.acknowledge();
第二个实例无法从代理接收消息,只要第一个服务仍在 doStuff()
中并且尚未确认消息。
如果服务终止或其他原因在 doStuff()
上失败,我如何以一种方式配置它,多个实例可以使用队列中的一条消息,但消息不会丢失?
使用factory.setSessionTransacted(true)
.
请参阅 DefaultMessageListenerContainer
的 javadoc:
* <p><b>It is strongly recommended to either set {@link #setSessionTransacted
* "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
* "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
* javadoc for details on acknowledge modes and native transaction options, as
* well as the {@link AbstractPollingMessageListenerContainer} javadoc for details
* on configuring an external transaction manager. Note that for the default
* "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment
* before listener execution, with no redelivery in case of an exception.