骆驼路由无法按预期工作
Camel routing not working as expected
我尝试在一个兔子队列和另一个兔子队列之间创建一条非常简单的路线。消息应该从一个队列到第二个队列不进行任何处理。但由于未知原因,消息一次又一次地重定向到第一个队列,而不是转到第二个队列。
@Component
public class CamelRouter extends SpringRouteBuilder {
@Override
public void configure() {
from("rabbitmq://localhost/test-in?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-in&username=guest&password=xxx")
.log(LoggingLevel.ERROR, "Output of message from Queue: ${in.body}")
.to("rabbitmq://localhost/test-out?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-out&username=guest&password=xxx");
}
}
日志如下:
09:04:18.564 [thread] WARN route1 - Output of message from Queue: test
09:04:18.700 [thread] WARN route1 - Output of message from Queue: test
09:04:18.835 [thread] WARN route1 - Output of message from Queue: test
09:04:18.968 [thread] WARN route1 - Output of message from Queue: test
09:04:19.104 [thread] WARN route1 - Output of message from Queue: test
09:04:19.238 [thread] WARN route1 - Output of message from Queue: test
这个骆驼配置有什么问题?我认为越简单越好。
请试试这个:
@Component
public class CamelRouter extends SpringRouteBuilder {
@Override
public void configure() {
from("rabbitmq://localhost/test-in?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-in&username=guest&password=xxx")
.removeHeaders("rabbitmq.*")
.log(LoggingLevel.ERROR, "Output of message from Queue: ${in.body}")
.to("rabbitmq://localhost/test-out?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-out&username=guest&password=xxx");
}
}
Camel 支持从消息 header 中覆盖某些设置,例如 queue(并且 RabbitMq 组件会这样做),因此我们需要删除它们以避免将消息发送回源queue。可以找到 rabitmq header 的完整列表 there。我猜 "rabbitmq.REPLY_TO" header 是有问题的。
与其删除 headers,不如在交换中使用 out 消息,如下所示。在这个特定的例子中,rabbitmq
前缀可能没问题,但是如果你尝试用其他组件使用这种方法(imap 是一个很好的例子)它不会因为各种奇怪的原因而工作。
from("rabbitmq://localhost/test-in?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-in&username=guest&password=xxx")
.log(LoggingLevel.ERROR, "Output of message from Queue: ${in.body}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getOut().setBody(exchange.getIn().getBody());
}
})
.to("rabbitmq://localhost/test-out?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-out&username=guest&password=xxx");
我尝试在一个兔子队列和另一个兔子队列之间创建一条非常简单的路线。消息应该从一个队列到第二个队列不进行任何处理。但由于未知原因,消息一次又一次地重定向到第一个队列,而不是转到第二个队列。
@Component
public class CamelRouter extends SpringRouteBuilder {
@Override
public void configure() {
from("rabbitmq://localhost/test-in?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-in&username=guest&password=xxx")
.log(LoggingLevel.ERROR, "Output of message from Queue: ${in.body}")
.to("rabbitmq://localhost/test-out?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-out&username=guest&password=xxx");
}
}
日志如下:
09:04:18.564 [thread] WARN route1 - Output of message from Queue: test
09:04:18.700 [thread] WARN route1 - Output of message from Queue: test
09:04:18.835 [thread] WARN route1 - Output of message from Queue: test
09:04:18.968 [thread] WARN route1 - Output of message from Queue: test
09:04:19.104 [thread] WARN route1 - Output of message from Queue: test
09:04:19.238 [thread] WARN route1 - Output of message from Queue: test
这个骆驼配置有什么问题?我认为越简单越好。
请试试这个:
@Component
public class CamelRouter extends SpringRouteBuilder {
@Override
public void configure() {
from("rabbitmq://localhost/test-in?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-in&username=guest&password=xxx")
.removeHeaders("rabbitmq.*")
.log(LoggingLevel.ERROR, "Output of message from Queue: ${in.body}")
.to("rabbitmq://localhost/test-out?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-out&username=guest&password=xxx");
}
}
Camel 支持从消息 header 中覆盖某些设置,例如 queue(并且 RabbitMq 组件会这样做),因此我们需要删除它们以避免将消息发送回源queue。可以找到 rabitmq header 的完整列表 there。我猜 "rabbitmq.REPLY_TO" header 是有问题的。
与其删除 headers,不如在交换中使用 out 消息,如下所示。在这个特定的例子中,rabbitmq
前缀可能没问题,但是如果你尝试用其他组件使用这种方法(imap 是一个很好的例子)它不会因为各种奇怪的原因而工作。
from("rabbitmq://localhost/test-in?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-in&username=guest&password=xxx")
.log(LoggingLevel.ERROR, "Output of message from Queue: ${in.body}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getOut().setBody(exchange.getIn().getBody());
}
})
.to("rabbitmq://localhost/test-out?autoAck=false&autoDelete=false&durable=true&exchangeType=fanout&queue=test-out&username=guest&password=xxx");