org.apache.camel.FailedToCreateConsumerException:无法为端点创建消费者:direct://validateFile

org.apache.camel.FailedToCreateConsumerException: Failed to create Consumer for endpoint: direct://validateFile

我在针对 headers 断言时遇到无法创建消费者端点的问题。有人可以帮我解决这个问题吗?

我有这个文件观察器路由,它使用文件并验证文件。如果其有效文件将更新 headers 并将其发送到 s3 blob。否则它会将其发送到错误目录。

这是我的路线:

  from("file-watch:test?events=CREATE&useFileHashing=true&antInclude=**/*.txt&recursive=true")
                .process(fileProcessor)
                .toD("direct:validateFile")
                .choice()
                .when(exchange -> exchange.getIn().getHeader("isValid").equals("valid"))
                .to("direct:updateMessageHeaders")
                .otherwise()
                .toD("direct:processErrorFiles")
                .endChoice()
                .end();

        from("direct:validateFile")
                .routeId("validateFile")
                .choice()
                .when(exchange -> Long.parseLong(exchange.getIn().getHeader("fileSize").toString()) > 0)
                .setHeader("isValid", simple("valid"))
                .otherwise()
                .setHeader("isValid", simple("invalid"))
                .endChoice()
                .end();
 final Processor fileProcessor = exchange -> {
        String fileName = exchange.getIn().getHeader("CamelFileAbsolutePath").toString();
        File gdisFile = new File(fileName);
        exchange.getIn().setHeader("fileSize", gdisFile.length());
    };

测试用例

public class RouteTest extends CamelTestSupport {

    @Override
    public RouteBuilder createRouteBuilder() throws Exception
    {
        return new Route();
    }
    @Test
    public void header_validation() {

        Map<String, Object> headers = new HashMap<>();
        headers.put("fileSize", 100);

        template.sendBodyAndHeaders("direct:validateFile", null, headers);
assertEquals("valid",consumer.receive("direct:validateFile").getIn().getHeader("isValid"));
    }
}

异常

org.apache.camel.FailedToCreateConsumerException: Failed to create Consumer for endpoint: direct://validateFile. Reason: java.lang.IllegalArgumentException: Cannot add a 2nd consumer to the same endpoint: direct://validateFile. DirectEndpoint only allows one consumer.
at org.apache.camel.support.cache.DefaultConsumerCache.acquirePollingConsumer(DefaultConsumerCache.java:107)

使用 requestBodyAndHeader 而不是 sendBodyAndHeader,这样您就可以得到作为 return 值的响应,然后您可以断言它。

https://camel.apache.org/manual/producertemplate.html

当你打电话时

template.sendBodyAndHeaders

Camel 执行 fire-and-forget - 它将您的数据发送到端点并且不等待结果。

你需要使用

tamplate.requestBodyAndHeaders

此方法等待数据通过所有路由,然后returns将结果返回给您。