骆驼路线中的生产者流量控制问题(持久消息)

Issue with Producer Flow Control in Camel Route (Persistent Messages)

我在寻找正确的 activemq 配置集以确保 Apache Camel 路由中的消息吞吐量一致时遇到问题。 当前配置使用以下技术:

这是在 activemq.xml 文件中找到的代理特定配置:

<broker xmlns="http://activemq.apache.org/schema/core"
        brokerName="localhost" dataDirectory="./activemq/data/" advisorySupport="false">
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue="PICKAXE.L5.PROC.>" producerFlowControl="true" storeUsageHighWaterMark="50" />
                    <policyEntry queue="PICKAXE.L5.COL.>" producerFlowControl="true" storeUsageHighWaterMark="95" />
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext createConnector="true" />
        </managementContext>

        <persistenceAdapter>
            <kahaDB directory="./activemq/kahadb/" />
        </persistenceAdapter>

        <systemUsage>
            <systemUsage sendFailIfNoSpaceAfterTimeout="3000000">
                <memoryUsage>
                    <memoryUsage limit="750 mb" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="2 gb" />
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="500 mb" />
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>

            <transportConnector name="openwire"
                uri="tcp://0.0.0.0:6616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
            <transportConnector name="amqp"
                uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
        </transportConnectors>
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>
    </broker>

我是运行下面的骆驼路线。队列 A 收到大量消息 (1000/s),因此它开始很快填满,因为这些消息的最终消费者无法跟上。由于消息数量最终达到持久存储生产者流控制规则的 50%,因此阻止将更多消息放置在队列 A 上。但是,当我通过 JMX 检查队列深度时,队列 A 和 B 都不会改变,就好像消费者被阻塞一样也。

    from(activemq:queue:PICKAXE.L5.PROC.A)
        .to(activemq:queue:PICKAXE.L5.COL.B);

    from(activemq:queue:PICKAXE.L5.COL.B)
        .autoStartup(!localFlag)
        .to(customEndpoint)
        .routeId(collectionRouteId);

大约一个星期以来,我尝试了 jms/activemq 配置的各种排列,但没有成功,所以我将不胜感激任何想法。我希望的行为是让此流中的消息消费者继续从持久存储中删除消息,这将允许消息继续彻底流动。

此问题是由过大的 sendFailIfNoSpaceAfterTimeout 引起的,在上述配置中被设置为 3000000。这导致代理在确认 send() 命令因持久存储已满而失败之前等待。

以上配置替换为:

<systemUsage sendFailIfNoSpaceAfterTimeout="300">

这确保(由于消息是持久的并且队列被集成到 Camel 路由中)当持久存储已满导致故障时,每 0.3 秒重试一次 send() 操作。