使用 Apache Camel 和 AMQ Artemis 处理大消息

Handle large messages with Apache Camel and AMQ Artemis

当我在 AMQ Artemis 队列中收到一条大消息 (100KiB+) 并尝试将此消息路由到另一个 AMQ 并且此消息具有 属性 _AMQ_LARGE_SIZE 我收到以下错误:

14:38:56.250 [Camel (CamelTestRoute) thread #1 - JmsConsumer[QUEUE.TEST]] WARN  o.a.c.c.jms.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST]
org.apache.camel.RuntimeCamelException: javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST

我知道如果我在连接工厂中设置 属性 minLargeMessageSize post AMQ 中的消息,则不会发生此问题。

问题是,我无法控制创建连接工厂的代码,有时它们不会设置大消息大小 属性。

有没有一种方法可以在 Camel 中使用我的连接工厂来处理这个问题?

*编辑

16:33:03.836 [Camel (CamelTestRoute) thread #1 - JmsConsumer[QUEUE.TEST]] WARN  o.a.c.c.jms.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST]
org.apache.camel.RuntimeCamelException: javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST
        at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1830)
        at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:196)
        at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:117)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST
        at org.apache.activemq.artemis.jms.client.ActiveMQDestination.fromAddress(ActiveMQDestination.java:119)
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.getJMSDestination(ActiveMQMessage.java:386)
        at org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:187)
        at org.apache.camel.component.jms.JmsMessage.populateInitialHeaders(JmsMessage.java:229)
        at org.apache.camel.impl.DefaultMessage.createHeaders(DefaultMessage.java:257)
        at org.apache.camel.component.jms.JmsMessage.ensureInitialHeaders(JmsMessage.java:214)
        at org.apache.camel.component.jms.JmsMessage.getHeader(JmsMessage.java:164)
        at org.apache.camel.impl.DefaultMessage.getHeader(DefaultMessage.java:93)
        at org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:115)
        at org.apache.camel.impl.MDCUnitOfWork.<init>(MDCUnitOfWork.java:54)
        at org.apache.camel.impl.DefaultUnitOfWorkFactory.createUnitOfWork(DefaultUnitOfWorkFactory.java:32)
        at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.createUnitOfWork(CamelInternalProcessor.java:695)
        at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:663)
        at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:634)
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:149)
        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
        at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113)
        ... 11 common frames omitted

如果您针对 Artemis 2.x 代理使用 Artemis 1.x 客户端,那么您需要使用适当的 anycastPrefixmulticastPrefix,例如:

<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>