SpringBoot Artemis 持久订阅
SpringBoot Artemis Durable Subscription
我正在尝试使用 Springboot 和 Artemis 获得持久订阅。
我在 Docker 下将一个应用程序配置为发布者和两个订阅者 运行,如下所示:
version: '3'
services:
amq:
image: "vromero/activemq-artemis"
environment:
- TZ=Australia/Sydney
- ARTEMIS_USERNAME=admin
- ARTEMIS_PASSWORD=admin
ports:
- "8161:8161"
- "61616:61616"
pub:
image: "amqdemo:latest"
environment:
- TZ=Australia/Sydney
- SPRING_ARTEMIS_HOST=amq
- SPRING_PROFILES_ACTIVE=publisher
- CLIENT_ID=pub
- MAX_SEND=1
depends_on:
- amq
sub1:
image: "amqdemo:latest"
environment:
- TZ=Australia/Sydney
- SPRING_ARTEMIS_HOST=amq
- SPRING_PROFILES_ACTIVE=subscriber1
- CLIENT_ID=sub1
depends_on:
- amq
- pub
sub2:
image: "amqdemo:latest"
environment:
- TZ=Australia/Sydney
- SPRING_ARTEMIS_HOST=amq
- SPRING_PROFILES_ACTIVE=subscriber2
- CLIENT_ID=sub2
depends_on:
- amq
- pub
主要 class 看起来像这样:
@SpringBootApplication
@EnableJms
@EnableScheduling
public class AmqdemoApplication {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${CLIENT_ID}")
private String clientId;
public static void main(String[] args) {
SpringApplication.run(AmqdemoApplication.class, args);
}
@Bean
@Primary
public ObjectMapper geObjMapper(){
return new ObjectMapper()
.registerModule(new ParameterNamesModule())
.registerModule(new Jdk8Module())
.registerModule(new JavaTimeModule());
}
@Bean
public JmsListenerContainerFactory<?> factory(CachingConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// TODO - not durable
// factory.setSubscriptionDurable(true);
connectionFactory.setClientId(clientId);
return factory;
}
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter(ObjectMapper objectMapper) {
objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(objectMapper);
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
}
大家都很开心。发布者发送和订阅者接收他们的主题:
pub_1 | Sending a message.
sub1_1 | 2021-01-13 14:58:28.463 INFO 1 --- [enerContainer-1] com.example.amqdemo.Topic1Receiver : Message read from topic 1 : CustomMessage{text='Topic 1 message', sequence=1, secret=false, sent=2021-01-13T14:58:28.456582} transfer time: 7070µs
sub2_1 | 2021-01-13 14:58:28.470 INFO 1 --- [enerContainer-1] com.example.amqdemo.Topic2Receiver : Message read from topic 2 : CustomMessage{text='Topic 2 message', sequence=1, secret=false, sent=2021-01-13T14:58:28.463210} transfer time: 7489µs
一旦我取消对 setSubscriptionDurable 的注释,订阅者就不再收到消息。
我找了很多需要设置clientId的参考,但是我已经设置了clientId。我在 DefaultJmsListenerContainerFactory 和 CachingConnectionFactory 上尝试了各种设置都无济于事。
Here is the full source 对于我正在处理的示例
它与 docker-compose.yml 中的 DURABLE=false 一样工作。将其设置为 true,它不再有效。
我也尝试了 PUB_SUB_DOMAIN true/false 的各种排列,但没有任何积极效果。
如果我为所有容器设置 DURABLE=true 和 PUB_SUB_DOMAIN=true,则队列如下所示:
似乎消息正在主题 1 上排队,但订阅者正在侦听没有消息的 sub1.topic1。
然后问题是您需要在 connectionFactory 上设置 pubSub 域:
@SpringBootApplication
@EnableJms
@EnableScheduling
public class AmqdemoApplication {
// ...
@Bean
public JmsListenerContainerFactory<?> factory(CachingConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(pubSubDomain); // <--- HERE
factory.setClientId(clientId);
factory.setSubscriptionDurable(durable);
connectionFactory.setClientId(clientId);
return factory;
}
// ...
和 jmsTemplate:
@Component
@Profile("publisher")
public class ScheduleConfig {
// ...
public ScheduleConfig(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
jmsTemplate.setPubSubDomain(true); // <--- HERE
}
我正在尝试使用 Springboot 和 Artemis 获得持久订阅。
我在 Docker 下将一个应用程序配置为发布者和两个订阅者 运行,如下所示:
version: '3'
services:
amq:
image: "vromero/activemq-artemis"
environment:
- TZ=Australia/Sydney
- ARTEMIS_USERNAME=admin
- ARTEMIS_PASSWORD=admin
ports:
- "8161:8161"
- "61616:61616"
pub:
image: "amqdemo:latest"
environment:
- TZ=Australia/Sydney
- SPRING_ARTEMIS_HOST=amq
- SPRING_PROFILES_ACTIVE=publisher
- CLIENT_ID=pub
- MAX_SEND=1
depends_on:
- amq
sub1:
image: "amqdemo:latest"
environment:
- TZ=Australia/Sydney
- SPRING_ARTEMIS_HOST=amq
- SPRING_PROFILES_ACTIVE=subscriber1
- CLIENT_ID=sub1
depends_on:
- amq
- pub
sub2:
image: "amqdemo:latest"
environment:
- TZ=Australia/Sydney
- SPRING_ARTEMIS_HOST=amq
- SPRING_PROFILES_ACTIVE=subscriber2
- CLIENT_ID=sub2
depends_on:
- amq
- pub
主要 class 看起来像这样:
@SpringBootApplication
@EnableJms
@EnableScheduling
public class AmqdemoApplication {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${CLIENT_ID}")
private String clientId;
public static void main(String[] args) {
SpringApplication.run(AmqdemoApplication.class, args);
}
@Bean
@Primary
public ObjectMapper geObjMapper(){
return new ObjectMapper()
.registerModule(new ParameterNamesModule())
.registerModule(new Jdk8Module())
.registerModule(new JavaTimeModule());
}
@Bean
public JmsListenerContainerFactory<?> factory(CachingConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// TODO - not durable
// factory.setSubscriptionDurable(true);
connectionFactory.setClientId(clientId);
return factory;
}
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter(ObjectMapper objectMapper) {
objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(objectMapper);
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
}
大家都很开心。发布者发送和订阅者接收他们的主题:
pub_1 | Sending a message.
sub1_1 | 2021-01-13 14:58:28.463 INFO 1 --- [enerContainer-1] com.example.amqdemo.Topic1Receiver : Message read from topic 1 : CustomMessage{text='Topic 1 message', sequence=1, secret=false, sent=2021-01-13T14:58:28.456582} transfer time: 7070µs
sub2_1 | 2021-01-13 14:58:28.470 INFO 1 --- [enerContainer-1] com.example.amqdemo.Topic2Receiver : Message read from topic 2 : CustomMessage{text='Topic 2 message', sequence=1, secret=false, sent=2021-01-13T14:58:28.463210} transfer time: 7489µs
一旦我取消对 setSubscriptionDurable 的注释,订阅者就不再收到消息。
我找了很多需要设置clientId的参考,但是我已经设置了clientId。我在 DefaultJmsListenerContainerFactory 和 CachingConnectionFactory 上尝试了各种设置都无济于事。
Here is the full source 对于我正在处理的示例
它与 docker-compose.yml 中的 DURABLE=false 一样工作。将其设置为 true,它不再有效。
我也尝试了 PUB_SUB_DOMAIN true/false 的各种排列,但没有任何积极效果。
如果我为所有容器设置 DURABLE=true 和 PUB_SUB_DOMAIN=true,则队列如下所示:
似乎消息正在主题 1 上排队,但订阅者正在侦听没有消息的 sub1.topic1。
然后问题是您需要在 connectionFactory 上设置 pubSub 域:
@SpringBootApplication
@EnableJms
@EnableScheduling
public class AmqdemoApplication {
// ...
@Bean
public JmsListenerContainerFactory<?> factory(CachingConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(pubSubDomain); // <--- HERE
factory.setClientId(clientId);
factory.setSubscriptionDurable(durable);
connectionFactory.setClientId(clientId);
return factory;
}
// ...
和 jmsTemplate:
@Component
@Profile("publisher")
public class ScheduleConfig {
// ...
public ScheduleConfig(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
jmsTemplate.setPubSubDomain(true); // <--- HERE
}