ActiveMQ Artemis 是否支持更新最后一个值队列中的计划消息?
Does ActiveMQ Artemis support updating scheduled messages in a last value queue?
在我对 Artemis LastValueQueue code 的测试和审查中,消息的调度延迟似乎优先于其对“last-value-key”的评估。换句话说,如果您安排一条消息,它只会在准备传递时评估替换队列中的最后一个值。
我的问题是我是否正确理解了代码,如果是,是否有可能有助于满足我们要求的变通方法或 ActiveMQ/Artemis 的功能。
我们的要求如下:
- 生成一条消息,并将对该消息的处理延迟到将来的某个时间点(通常为 30 秒)。
- 如果由于新的外部事件生成了消息的更新版本,请将任何现有的计划消息替换为消息的新版本 - 除了消息负载之外,计划的传递时间也应更新。
一些其他注意事项:
- 我目前的原型是使用 Artemis 嵌入式服务器
- Spring-jms JmsTemplate 被用于生成消息
- Spring-jms JmsListenerContainerFactory 被用来消费消息
- 我们目前不使用 SpringBoot,因此您会在下面看到一些 bean 设置。
ArtemisConfig.java:
@Configuration
@EnableJms
public class ArtemisConfig {
@Bean
public org.apache.activemq.artemis.core.config.Configuration configuration() throws Exception {
org.apache.activemq.artemis.core.config.Configuration config = new ConfigurationImpl();
config.addAcceptorConfiguration("in-vm", "vm://0");
config.setPersistenceEnabled(true);
config.setSecurityEnabled(false);
config.setJournalType(JournalType.ASYNCIO);
config.setCreateJournalDir(true);
config.setJournalDirectory("/var/mq/journal");
config.setBindingsDirectory("/var/mq/bindings");
config.setLargeMessagesDirectory("/var/mq/large-messages");
config.setJMXManagementEnabled(true);
QueueConfiguration queueConfiguration = new QueueConfiguration("MYLASTVALUEQUEUE");
queueConfiguration.setAddress("MYLASTVALUEQUEUE");
queueConfiguration.setLastValueKey("uniqueJobId");
queueConfiguration.setDurable(true);
queueConfiguration.setEnabled(true);
queueConfiguration.setRoutingType(RoutingType.ANYCAST);
CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
coreAddressConfiguration.addQueueConfiguration(queueConfiguration);
config.addAddressConfiguration(coreAddressConfiguration);
return config;
}
@Bean
public EmbeddedActiveMQ artemisServer() throws Exception {
EmbeddedActiveMQ server = new EmbeddedActiveMQ();
server.setConfiguration(configuration());
server.start();
return server;
}
@PreDestroy
public void preDestroy() throws Exception {
artemisServer().stop();
}
@Bean
public ConnectionFactory activeMqConnectionFactory() throws Exception {
return ActiveMQJMSClient.createConnectionFactory("vm://0", "artemis-client");
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory() throws Exception {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMqConnectionFactory());
factory.setSessionTransacted(true);
factory.setConcurrency("8");
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public JmsTemplate jmsTemplate() throws Exception {
JmsTemplate jmsTemplate = new JmsTemplate(activeMqConnectionFactory());
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
jmsTemplate.setDeliveryPersistent(true);
return jmsTemplate;
}
@Bean
QueueMessageService queueMessageService() {
return new QueueMessageService();
}
}
QueueMessageService.java
public class QueueMessageService {
@Resource
private JmsTemplate jmsTemplate;
public void queueJobRequest(
final String queue,
final int priority,
final long deliveryDelayInSeconds,
final MyMessage message) {
jmsTemplate.convertAndSend(queue, jobRequest, message -> {
message.setJMSPriority(priority);
if (deliveryDelayInSeconds > 0 && deliveryDelayInSeconds <= 86400) {
message.setLongProperty(
Message.HDR_SCHEDULED_DELIVERY_TIME.toString(),
Instant.now().plus(deliveryDelayInSeconds, ChronoUnit.SECONDS).toEpochMilli()
);
}
message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "uniqueJobId");
message.setStringProperty("uniqueJobId", jobRequest.getUniqueJobId().toString());
return message;
});
}
}
您对使用 last-value 队列的计划消息语义的理解是正确的。当一条消息被安排时,它在技术上 尚未在队列 上。它不会被放入队列,直到预定时间到达,此时 last-value 队列语义被强制执行。
除了实现新功能之外,我看不出如何以任何一种自动方式实现您想要的行为。此时我的建议是使用管理 API(即 QueueControl
)在发送“新”预定消息之前手动删除“旧”预定消息。您可以为此使用 removeMessage
方法之一,因为它们可以处理预定的消息和 non-scheduled 类似的消息。
在我对 Artemis LastValueQueue code 的测试和审查中,消息的调度延迟似乎优先于其对“last-value-key”的评估。换句话说,如果您安排一条消息,它只会在准备传递时评估替换队列中的最后一个值。
我的问题是我是否正确理解了代码,如果是,是否有可能有助于满足我们要求的变通方法或 ActiveMQ/Artemis 的功能。
我们的要求如下:
- 生成一条消息,并将对该消息的处理延迟到将来的某个时间点(通常为 30 秒)。
- 如果由于新的外部事件生成了消息的更新版本,请将任何现有的计划消息替换为消息的新版本 - 除了消息负载之外,计划的传递时间也应更新。
一些其他注意事项:
- 我目前的原型是使用 Artemis 嵌入式服务器
- Spring-jms JmsTemplate 被用于生成消息
- Spring-jms JmsListenerContainerFactory 被用来消费消息
- 我们目前不使用 SpringBoot,因此您会在下面看到一些 bean 设置。
ArtemisConfig.java:
@Configuration
@EnableJms
public class ArtemisConfig {
@Bean
public org.apache.activemq.artemis.core.config.Configuration configuration() throws Exception {
org.apache.activemq.artemis.core.config.Configuration config = new ConfigurationImpl();
config.addAcceptorConfiguration("in-vm", "vm://0");
config.setPersistenceEnabled(true);
config.setSecurityEnabled(false);
config.setJournalType(JournalType.ASYNCIO);
config.setCreateJournalDir(true);
config.setJournalDirectory("/var/mq/journal");
config.setBindingsDirectory("/var/mq/bindings");
config.setLargeMessagesDirectory("/var/mq/large-messages");
config.setJMXManagementEnabled(true);
QueueConfiguration queueConfiguration = new QueueConfiguration("MYLASTVALUEQUEUE");
queueConfiguration.setAddress("MYLASTVALUEQUEUE");
queueConfiguration.setLastValueKey("uniqueJobId");
queueConfiguration.setDurable(true);
queueConfiguration.setEnabled(true);
queueConfiguration.setRoutingType(RoutingType.ANYCAST);
CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
coreAddressConfiguration.addQueueConfiguration(queueConfiguration);
config.addAddressConfiguration(coreAddressConfiguration);
return config;
}
@Bean
public EmbeddedActiveMQ artemisServer() throws Exception {
EmbeddedActiveMQ server = new EmbeddedActiveMQ();
server.setConfiguration(configuration());
server.start();
return server;
}
@PreDestroy
public void preDestroy() throws Exception {
artemisServer().stop();
}
@Bean
public ConnectionFactory activeMqConnectionFactory() throws Exception {
return ActiveMQJMSClient.createConnectionFactory("vm://0", "artemis-client");
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory() throws Exception {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMqConnectionFactory());
factory.setSessionTransacted(true);
factory.setConcurrency("8");
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public JmsTemplate jmsTemplate() throws Exception {
JmsTemplate jmsTemplate = new JmsTemplate(activeMqConnectionFactory());
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
jmsTemplate.setDeliveryPersistent(true);
return jmsTemplate;
}
@Bean
QueueMessageService queueMessageService() {
return new QueueMessageService();
}
}
QueueMessageService.java
public class QueueMessageService {
@Resource
private JmsTemplate jmsTemplate;
public void queueJobRequest(
final String queue,
final int priority,
final long deliveryDelayInSeconds,
final MyMessage message) {
jmsTemplate.convertAndSend(queue, jobRequest, message -> {
message.setJMSPriority(priority);
if (deliveryDelayInSeconds > 0 && deliveryDelayInSeconds <= 86400) {
message.setLongProperty(
Message.HDR_SCHEDULED_DELIVERY_TIME.toString(),
Instant.now().plus(deliveryDelayInSeconds, ChronoUnit.SECONDS).toEpochMilli()
);
}
message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "uniqueJobId");
message.setStringProperty("uniqueJobId", jobRequest.getUniqueJobId().toString());
return message;
});
}
}
您对使用 last-value 队列的计划消息语义的理解是正确的。当一条消息被安排时,它在技术上 尚未在队列 上。它不会被放入队列,直到预定时间到达,此时 last-value 队列语义被强制执行。
除了实现新功能之外,我看不出如何以任何一种自动方式实现您想要的行为。此时我的建议是使用管理 API(即 QueueControl
)在发送“新”预定消息之前手动删除“旧”预定消息。您可以为此使用 removeMessage
方法之一,因为它们可以处理预定的消息和 non-scheduled 类似的消息。