为什么 camel route 在调用 Message Consumption Listener 后给出 Failed to send reply with payload 错误?
Why does camel route give Failed to send reply with payload error after calling the Message Consumption Listener?
我在我的文件路由生成器中定义了一个骆驼路由,如下所示:
from("timer:first-timer") .bean(new MsgConsumptionListener(),"receiveMessage")
.log(">>>> Body content is :::::::: ---> ${body}")
.log(">>> Name is ::::::: ------> ${body.Name}")
.setHeader("Name",method(MsgConsumptionListener.class))
.to("log:first-timer");
在 MsgConsumptionListener.java 中,我正在使用这样的消息:
@Async
@JmsListener(destination = "MyQueue")
public String receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
log.info("Message has been received----->" +jsonMessage);
String rcvrName = null;
if(jsonMessage instanceof TextMessage) {
TextMessage textMessage = (TextMessage)jsonMessage;
messageData = textMessage.getText();
Map map = new Gson().fromJson(messageData, Map.class);
rcvrName = (String) map.get("Name");
log.info("Receiver Name----->" +rcvrName);
}
return rcvrName;
}
我可以看到消息正在被消耗,但是 camel 路由在调用侦听器时出现以下错误:
c.f.i.listener.MsgConsumptionListener : Message has been received----->ActiveMQTextMessage {commandId = 5, responseRequired = true, destination = queue://MyQueue, transactionId = null, expiration = 0, timestamp = 1644257093499, arrival = 0, brokerInTime = 1644257093500, brokerOutTime = 1644298154164, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@4d7cdcac, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"Name":"Kunal"}}
c.f.i.listener.MsgConsumptionListener : Receiver Name----->Kunal
[ main] o.a.c.impl.engine.AbstractCamelContext : Routes startup summary (total:1 started:1)
[ main] o.a.c.impl.engine.AbstractCamelContext : Started route1 (timer://first-timer)
[ main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 3.11.2 (camel-1) started in 724ms (build:72ms init:622ms start:30ms)
o.s.j.l.DefaultMessageListenerContainer : Execution of JMS message listener failed, and no ErrorHandler has been set.
org.springframework.jms.listener.adapter.ReplyFailureException: Failed to send reply with payload [bp_exchange.in]; nested exception is javax.jms.InvalidDestinationException: Cannot determine response destination: Request message does not contain reply-to destination, and no default response destination set.
[r://first-timer] c.f.i.listener.FileEventListener : Message has been received----->null
[r://first-timer] route1 : >>>
[r://first-timer] c.f.i.listener.FileEventListener : Message has been received----->null
[r://first-timer] route1 : >>> The file Name set in the header is :
[r://first-timer] c.f.i.listener.FileEventListener : Message has been received----->null
所以骆驼路由无法获取消息属性名称并将其设置在 header 中。
有谁知道问题是什么以及我应该如何获取路由中的消息属性?
我的 objective 是 使用文件路由构建器调用我的 MsgConsumption 侦听器,每当消息被放入 queue 时,我想要路由触发侦听器,使用消息并从消息中获取属性名称以用于进一步处理。
我不是 Spring JMS 专家,但您似乎将 Spring JMS 侦听器行为与 Camel 路由行为混合在一起,这是行不通的。如果我理解你上面的 objective,你希望路由选择排队的消息并处理它。如果那是真的,我建议不要使用 Spring JMS 侦听器并删除路由中的 from(timer) 并将其更改为 from(activemq) 并采用当前侦听器中的逻辑来检查消息并提取命名并将其放入路由调用的 bean 中。喜欢:
from("activemq:myqueue")
.bean(GetTheNameBean)
.to("log:message")
我在我的文件路由生成器中定义了一个骆驼路由,如下所示:
from("timer:first-timer") .bean(new MsgConsumptionListener(),"receiveMessage")
.log(">>>> Body content is :::::::: ---> ${body}")
.log(">>> Name is ::::::: ------> ${body.Name}")
.setHeader("Name",method(MsgConsumptionListener.class))
.to("log:first-timer");
在 MsgConsumptionListener.java 中,我正在使用这样的消息:
@Async
@JmsListener(destination = "MyQueue")
public String receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
log.info("Message has been received----->" +jsonMessage);
String rcvrName = null;
if(jsonMessage instanceof TextMessage) {
TextMessage textMessage = (TextMessage)jsonMessage;
messageData = textMessage.getText();
Map map = new Gson().fromJson(messageData, Map.class);
rcvrName = (String) map.get("Name");
log.info("Receiver Name----->" +rcvrName);
}
return rcvrName;
}
我可以看到消息正在被消耗,但是 camel 路由在调用侦听器时出现以下错误:
c.f.i.listener.MsgConsumptionListener : Message has been received----->ActiveMQTextMessage {commandId = 5, responseRequired = true, destination = queue://MyQueue, transactionId = null, expiration = 0, timestamp = 1644257093499, arrival = 0, brokerInTime = 1644257093500, brokerOutTime = 1644298154164, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@4d7cdcac, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"Name":"Kunal"}}
c.f.i.listener.MsgConsumptionListener : Receiver Name----->Kunal
[ main] o.a.c.impl.engine.AbstractCamelContext : Routes startup summary (total:1 started:1)
[ main] o.a.c.impl.engine.AbstractCamelContext : Started route1 (timer://first-timer)
[ main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 3.11.2 (camel-1) started in 724ms (build:72ms init:622ms start:30ms)
o.s.j.l.DefaultMessageListenerContainer : Execution of JMS message listener failed, and no ErrorHandler has been set.
org.springframework.jms.listener.adapter.ReplyFailureException: Failed to send reply with payload [bp_exchange.in]; nested exception is javax.jms.InvalidDestinationException: Cannot determine response destination: Request message does not contain reply-to destination, and no default response destination set.
[r://first-timer] c.f.i.listener.FileEventListener : Message has been received----->null
[r://first-timer] route1 : >>>
[r://first-timer] c.f.i.listener.FileEventListener : Message has been received----->null
[r://first-timer] route1 : >>> The file Name set in the header is :
[r://first-timer] c.f.i.listener.FileEventListener : Message has been received----->null
所以骆驼路由无法获取消息属性名称并将其设置在 header 中。 有谁知道问题是什么以及我应该如何获取路由中的消息属性?
我的 objective 是 使用文件路由构建器调用我的 MsgConsumption 侦听器,每当消息被放入 queue 时,我想要路由触发侦听器,使用消息并从消息中获取属性名称以用于进一步处理。
我不是 Spring JMS 专家,但您似乎将 Spring JMS 侦听器行为与 Camel 路由行为混合在一起,这是行不通的。如果我理解你上面的 objective,你希望路由选择排队的消息并处理它。如果那是真的,我建议不要使用 Spring JMS 侦听器并删除路由中的 from(timer) 并将其更改为 from(activemq) 并采用当前侦听器中的逻辑来检查消息并提取命名并将其放入路由调用的 bean 中。喜欢:
from("activemq:myqueue")
.bean(GetTheNameBean)
.to("log:message")