SpringBoot Artemis 持久订阅

SpringBoot Artemis Durable Subscription

我正在尝试使用 Springboot 和 Artemis 获得持久订阅。

我在 Docker 下将一个应用程序配置为发布者和两个订阅者 运行,如下所示:

version: '3'
services:
    amq:
        image: "vromero/activemq-artemis"
        environment:
            - TZ=Australia/Sydney
            - ARTEMIS_USERNAME=admin
            - ARTEMIS_PASSWORD=admin
        ports:
            - "8161:8161"
            - "61616:61616"
    pub:
        image: "amqdemo:latest"
        environment:
            - TZ=Australia/Sydney
            - SPRING_ARTEMIS_HOST=amq
            - SPRING_PROFILES_ACTIVE=publisher
            - CLIENT_ID=pub
            - MAX_SEND=1
        depends_on:
            - amq
    sub1:
        image: "amqdemo:latest"
        environment:
            - TZ=Australia/Sydney
            - SPRING_ARTEMIS_HOST=amq
            - SPRING_PROFILES_ACTIVE=subscriber1
            - CLIENT_ID=sub1
        depends_on:
            - amq
            - pub
    sub2:
        image: "amqdemo:latest"
        environment:
            - TZ=Australia/Sydney
            - SPRING_ARTEMIS_HOST=amq
            - SPRING_PROFILES_ACTIVE=subscriber2
            - CLIENT_ID=sub2
        depends_on:
            - amq
            - pub

主要 class 看起来像这样:

@SpringBootApplication
@EnableJms
@EnableScheduling
public class AmqdemoApplication {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Value("${CLIENT_ID}")
    private String clientId;

    public static void main(String[] args) {
        SpringApplication.run(AmqdemoApplication.class, args);
    }

    @Bean
    @Primary
    public ObjectMapper geObjMapper(){
        return new ObjectMapper()
                .registerModule(new ParameterNamesModule())
                .registerModule(new Jdk8Module())
                .registerModule(new JavaTimeModule());
    }

    @Bean
    public JmsListenerContainerFactory<?> factory(CachingConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);

// TODO - not durable
//      factory.setSubscriptionDurable(true);

        connectionFactory.setClientId(clientId);

        return factory;
    }

    @Bean // Serialize message content to json using TextMessage
    public MessageConverter jacksonJmsMessageConverter(ObjectMapper objectMapper) {
        objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setObjectMapper(objectMapper);
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }
}

大家都很开心。发布者发送和订阅者接收他们的主题:

pub_1   | Sending a message.
sub1_1  | 2021-01-13 14:58:28.463  INFO 1 --- [enerContainer-1] com.example.amqdemo.Topic1Receiver       : Message read from topic 1 : CustomMessage{text='Topic 1 message', sequence=1, secret=false, sent=2021-01-13T14:58:28.456582} transfer time: 7070µs
sub2_1  | 2021-01-13 14:58:28.470  INFO 1 --- [enerContainer-1] com.example.amqdemo.Topic2Receiver       : Message read from topic 2 : CustomMessage{text='Topic 2 message', sequence=1, secret=false, sent=2021-01-13T14:58:28.463210} transfer time: 7489µs

一旦我取消对 setSubscriptionDurable 的注释,订阅者就不再收到消息。

我找了很多需要设置clientId的参考,但是我已经设置了clientId。我在 DefaultJmsListenerContainerFactory 和 CachingConnectionFactory 上尝试了各种设置都无济于事。

Here is the full source 对于我正在处理的示例

它与 docker-compose.yml 中的 DURABLE=false 一样工作。将其设置为 true,它不再有效。

我也尝试了 PUB_SUB_DOMAIN true/false 的各种排列,但没有任何积极效果。

如果我为所有容器设置 DURABLE=true 和 PUB_SUB_DOMAIN=true,则队列如下所示:

似乎消息正在主题 1 上排队,但订阅者正在侦听没有消息的 sub1.topic1。

然后问题是您需要在 connectionFactory 上设置 pubSub 域:

@SpringBootApplication
@EnableJms
@EnableScheduling
public class AmqdemoApplication {
    // ...
    @Bean
    public JmsListenerContainerFactory<?> factory(CachingConnectionFactory connectionFactory,
                                                  DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(pubSubDomain); // <--- HERE
        factory.setClientId(clientId);
        factory.setSubscriptionDurable(durable);
        connectionFactory.setClientId(clientId);

        return factory;
    }
    // ...

和 jmsTemplate:

@Component
@Profile("publisher")
public class ScheduleConfig {
    // ...
    public ScheduleConfig(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
        jmsTemplate.setPubSubDomain(true); // <--- HERE
    }