从 Kafka 到 JMS 的骆驼路线不起作用(JMS->Kafka->JMS)
Camel route from Kafka to JMS not working (JMS->Kafka->JMS)
在我的骆驼中Router.java我有下一条路线
from("jms:topic:test.source.topic?asyncConsumer=true")
.log("Message: ${body}")
.to("kafka:testing?brokers=192.168.0.100:9092");
from("kafka:testing?brokers=192.168.0.100:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
// manually set JMSDeliveryMode (1 - NON_PERSISTENT, 2 - PERSISTENT)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader("JMSDeliveryMode", "1");
}
})
.to("jms:topic:test.sink.topic");
上述 Camel 路线出现问题。如果我使用 Kafka 生产者向主题 testing
发送一些消息,而我使用 bin/kafka-console-producer.sh --topic testing --bootstrap-server localhost:9092
运行,则从 Kafka 到 JMS 的路由工作正常。所以这些链接的 Camel 路线存在一些问题。
在 Camel pom.xml 中 Spring 引导 camel-kafka-starter 和 camel-jms-starter 依赖项。
当我启动 Spring Boot Camel with Maven 并将一些消息从 Kafka 生产者发送到 Kafka broker testing
主题时,我可以看到 Kafka broker 收到了消息并且上面的日志打印正常.
第 .to("jms:topic:test.sink.topic");
行出现错误,我不知道这是什么意思。
ERROR 28642 --- [aConsumer[testing]] o.a.c.p.e.DefaultErrorHandler: Failed delivery for (MessageId: ID-PCID on ExchangeId: ID-PCID). Exhausted after delivery attempt: 1 caught: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:311) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:185) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:525) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:438) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:392) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:155) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:286) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:40) ~[camel-support-3.4.0.jar:3.4.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:346) ~[camel-kafka-3.4.0.jar:3.4.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:222) ~[camel-kafka-3.4.0.jar:3.4.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0
at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setJMSDeliveryMode(ActiveMQMessage.java:450) ~[artemis-jms-client-2.12.0.jar:2.12.0]
at org.apache.camel.component.jms.JmsMessageHelper.setJMSDeliveryMode(JmsMessageHelper.java:435) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsBinding.appendJmsProperty(JmsBinding.java:393) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsBinding.appendJmsProperties(JmsBinding.java:371) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:346) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsProducer.createMessage(JmsProducer.java:325) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:561) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.lambda$send[=11=](JmsConfiguration.java:527) ~[camel-jms-3.4.0.jar:3.4.0]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:504) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
... 19 common frames omitted
当从 JMS 向 Kafka 发送一些消息时,路由工作正常。
这是相关的错误信息:
AMQ139015: Illegal deliveryMode value: 0
此错误消息表示已调用 JMS 方法 javax.jms.Message#setJMSDeliveryMode
with a value of 0
. The value of 0
is not valid. Valid delivery mode values are defined by javax.jms.DeliveryMode
:
stack-trace 表示这个无效值是由 Camel 根据输入消息设置的。参见 this code。为了解决这个问题,您需要确定 header JMSDeliveryMode
在 org.apache.camel.Message
.
上的设置位置
在我的骆驼中Router.java我有下一条路线
from("jms:topic:test.source.topic?asyncConsumer=true")
.log("Message: ${body}")
.to("kafka:testing?brokers=192.168.0.100:9092");
from("kafka:testing?brokers=192.168.0.100:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
// manually set JMSDeliveryMode (1 - NON_PERSISTENT, 2 - PERSISTENT)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader("JMSDeliveryMode", "1");
}
})
.to("jms:topic:test.sink.topic");
上述 Camel 路线出现问题。如果我使用 Kafka 生产者向主题 testing
发送一些消息,而我使用 bin/kafka-console-producer.sh --topic testing --bootstrap-server localhost:9092
运行,则从 Kafka 到 JMS 的路由工作正常。所以这些链接的 Camel 路线存在一些问题。
在 Camel pom.xml 中 Spring 引导 camel-kafka-starter 和 camel-jms-starter 依赖项。
当我启动 Spring Boot Camel with Maven 并将一些消息从 Kafka 生产者发送到 Kafka broker testing
主题时,我可以看到 Kafka broker 收到了消息并且上面的日志打印正常.
第 .to("jms:topic:test.sink.topic");
行出现错误,我不知道这是什么意思。
ERROR 28642 --- [aConsumer[testing]] o.a.c.p.e.DefaultErrorHandler: Failed delivery for (MessageId: ID-PCID on ExchangeId: ID-PCID). Exhausted after delivery attempt: 1 caught: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:311) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:185) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:525) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:438) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:392) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:155) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:286) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:40) ~[camel-support-3.4.0.jar:3.4.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:346) ~[camel-kafka-3.4.0.jar:3.4.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:222) ~[camel-kafka-3.4.0.jar:3.4.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0
at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setJMSDeliveryMode(ActiveMQMessage.java:450) ~[artemis-jms-client-2.12.0.jar:2.12.0]
at org.apache.camel.component.jms.JmsMessageHelper.setJMSDeliveryMode(JmsMessageHelper.java:435) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsBinding.appendJmsProperty(JmsBinding.java:393) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsBinding.appendJmsProperties(JmsBinding.java:371) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:346) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsProducer.createMessage(JmsProducer.java:325) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:561) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.lambda$send[=11=](JmsConfiguration.java:527) ~[camel-jms-3.4.0.jar:3.4.0]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:504) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
... 19 common frames omitted
当从 JMS 向 Kafka 发送一些消息时,路由工作正常。
这是相关的错误信息:
AMQ139015: Illegal deliveryMode value: 0
此错误消息表示已调用 JMS 方法 javax.jms.Message#setJMSDeliveryMode
with a value of 0
. The value of 0
is not valid. Valid delivery mode values are defined by javax.jms.DeliveryMode
:
stack-trace 表示这个无效值是由 Camel 根据输入消息设置的。参见 this code。为了解决这个问题,您需要确定 header JMSDeliveryMode
在 org.apache.camel.Message
.