org.apache.camel.FailedToCreateConsumerException:无法为端点创建消费者:direct://validateFile
org.apache.camel.FailedToCreateConsumerException: Failed to create Consumer for endpoint: direct://validateFile
我在针对 headers 断言时遇到无法创建消费者端点的问题。有人可以帮我解决这个问题吗?
我有这个文件观察器路由,它使用文件并验证文件。如果其有效文件将更新 headers 并将其发送到 s3 blob。否则它会将其发送到错误目录。
这是我的路线:
from("file-watch:test?events=CREATE&useFileHashing=true&antInclude=**/*.txt&recursive=true")
.process(fileProcessor)
.toD("direct:validateFile")
.choice()
.when(exchange -> exchange.getIn().getHeader("isValid").equals("valid"))
.to("direct:updateMessageHeaders")
.otherwise()
.toD("direct:processErrorFiles")
.endChoice()
.end();
from("direct:validateFile")
.routeId("validateFile")
.choice()
.when(exchange -> Long.parseLong(exchange.getIn().getHeader("fileSize").toString()) > 0)
.setHeader("isValid", simple("valid"))
.otherwise()
.setHeader("isValid", simple("invalid"))
.endChoice()
.end();
final Processor fileProcessor = exchange -> {
String fileName = exchange.getIn().getHeader("CamelFileAbsolutePath").toString();
File gdisFile = new File(fileName);
exchange.getIn().setHeader("fileSize", gdisFile.length());
};
测试用例
public class RouteTest extends CamelTestSupport {
@Override
public RouteBuilder createRouteBuilder() throws Exception
{
return new Route();
}
@Test
public void header_validation() {
Map<String, Object> headers = new HashMap<>();
headers.put("fileSize", 100);
template.sendBodyAndHeaders("direct:validateFile", null, headers);
assertEquals("valid",consumer.receive("direct:validateFile").getIn().getHeader("isValid"));
}
}
异常
org.apache.camel.FailedToCreateConsumerException: Failed to create Consumer for endpoint: direct://validateFile. Reason: java.lang.IllegalArgumentException: Cannot add a 2nd consumer to the same endpoint: direct://validateFile. DirectEndpoint only allows one consumer.
at org.apache.camel.support.cache.DefaultConsumerCache.acquirePollingConsumer(DefaultConsumerCache.java:107)
使用 requestBodyAndHeader 而不是 sendBodyAndHeader,这样您就可以得到作为 return 值的响应,然后您可以断言它。
当你打电话时
template.sendBodyAndHeaders
Camel 执行 fire-and-forget - 它将您的数据发送到端点并且不等待结果。
你需要使用
tamplate.requestBodyAndHeaders
此方法等待数据通过所有路由,然后returns将结果返回给您。
我在针对 headers 断言时遇到无法创建消费者端点的问题。有人可以帮我解决这个问题吗?
我有这个文件观察器路由,它使用文件并验证文件。如果其有效文件将更新 headers 并将其发送到 s3 blob。否则它会将其发送到错误目录。
这是我的路线:
from("file-watch:test?events=CREATE&useFileHashing=true&antInclude=**/*.txt&recursive=true")
.process(fileProcessor)
.toD("direct:validateFile")
.choice()
.when(exchange -> exchange.getIn().getHeader("isValid").equals("valid"))
.to("direct:updateMessageHeaders")
.otherwise()
.toD("direct:processErrorFiles")
.endChoice()
.end();
from("direct:validateFile")
.routeId("validateFile")
.choice()
.when(exchange -> Long.parseLong(exchange.getIn().getHeader("fileSize").toString()) > 0)
.setHeader("isValid", simple("valid"))
.otherwise()
.setHeader("isValid", simple("invalid"))
.endChoice()
.end();
final Processor fileProcessor = exchange -> {
String fileName = exchange.getIn().getHeader("CamelFileAbsolutePath").toString();
File gdisFile = new File(fileName);
exchange.getIn().setHeader("fileSize", gdisFile.length());
};
测试用例
public class RouteTest extends CamelTestSupport {
@Override
public RouteBuilder createRouteBuilder() throws Exception
{
return new Route();
}
@Test
public void header_validation() {
Map<String, Object> headers = new HashMap<>();
headers.put("fileSize", 100);
template.sendBodyAndHeaders("direct:validateFile", null, headers);
assertEquals("valid",consumer.receive("direct:validateFile").getIn().getHeader("isValid"));
}
}
异常
org.apache.camel.FailedToCreateConsumerException: Failed to create Consumer for endpoint: direct://validateFile. Reason: java.lang.IllegalArgumentException: Cannot add a 2nd consumer to the same endpoint: direct://validateFile. DirectEndpoint only allows one consumer.
at org.apache.camel.support.cache.DefaultConsumerCache.acquirePollingConsumer(DefaultConsumerCache.java:107)
使用 requestBodyAndHeader 而不是 sendBodyAndHeader,这样您就可以得到作为 return 值的响应,然后您可以断言它。
当你打电话时
template.sendBodyAndHeaders
Camel 执行 fire-and-forget - 它将您的数据发送到端点并且不等待结果。
你需要使用
tamplate.requestBodyAndHeaders
此方法等待数据通过所有路由,然后returns将结果返回给您。