ActiveMQ compositeTopic的选择性持久化

ActiveMQ compositeTopic's selective persistence

我正在使用 ActiveMQ 的 compositeTopic 将消息扇出到多个目的地,如下所示:

<broker>
    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>

    ...

    <destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <compositeTopic name="fan-out" forwardOnly="true">
                    <forwardTo>
                        <queue physicalName="persistent"/>
                        <queue physicalName="ephemeral"/>
                    </forwardTo>
                </compositeTopic>
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>
</broker>

所以,我想同时将消息转发到 persistentephemeral 队列。您可能会从它们的名字中猜到,我希望 persistent 队列中的消息是持久的,而我不需要 ephemeral 队列的持久性。问题是 ActiveMQ 没有基于每个目标的持久性概念,对吗?可以为整个代理设置持久性,或使用持久性/非持久性交付模式。所以,问题是:在这种情况下如何禁用 ephemeral 队列的持久性?

因此,似乎可行的解决方案是将 Apache Camel 与 ActiveMQ 一起使用。只需添加一个路由,将 ephemeral 队列排空到另一个队列设置 TTL/进程中的持久模式:

<broker>
    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>

    ...

    <destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <compositeTopic name="fan-out" forwardOnly="true">
                    <forwardTo>
                        <queue physicalName="persistent"/>
                        <queue physicalName="ephemeral"/>
                    </forwardTo>
                </compositeTopic>
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>
</broker>

<camelContext xmlns="http://camel.apache.org/schema/spring" id="camel">
    <route>
        <from uri="activemq:queue:ephemeral"/>
        <to uri="activemq:queue:ephemeral-backend?timeToLive=10000"/>
    </route>
</camelContext>

timeToLive 是消息的 TTL(以毫秒为单位)。在上面的配置中,消息仍然是持久的:在 TTL 到期后,它们被移动到 DLQ。如果你想扔掉它们,那么配置应该包括 deliveryPersistent 设置为 false:

<camelContext xmlns="http://camel.apache.org/schema/spring" id="camel">
  <route>
    <from uri="activemq:queue:ephemeral" />
    <to uri="activemq:queue:ephemeral-backend?timeToLive=10000&amp;deliveryPersistent=false" />
  </route>
</camelContext>