骆驼-目录中有多个文件时启动路由
Camel - start route when multiple files in directory
我想在目录中有 4 个具有特定名称的文件时启动骆驼路线。当file1.csv.sem、file2.csv.sem、file3.csv.sem、file4.csv.sem开始骆驼路线
<from uri="file:{{directory.path}}?include=file1.*.sem"/>
我希望当这 4 个文件在目录中时,camel 路由将启动。
您可以创建使用自定义处理器的计时器或计划路线,该处理器使用 File.list()
或 File.listFiles()
检查文件夹的内容。如果所有必需的文件都存在,则处理器设置 body、header 或 属性 以发出路由应该继续的信号。
之后您可以使用 PollEnrich 单独使用每个文件并将它们存储到 header 或 属性 以备后用。
String filePath = getContext()
.resolvePropertyPlaceholders("{{input.path}}");
String[] requiredFiles = new String[]{
"file1.csv.sem",
"file2.csv.sem",
"file3.csv.sem"
};
// Poll input folder every 5 seconds for required files
from("timer:timerName?period=5000")
.routeId("multifiles")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
for (int i = 0; i < requiredFiles.length; i++) {
File file = new File(filePath + File.separator + requiredFiles[i]);
if(!file.exists()){
exchange.getMessage().setBody(false);
return;
}
}
exchange.getMessage().setBody(true);
}
})
.filter(body().isEqualTo(false))
.log("Missing files..")
.stop()
.end()
.to("direct:processFileGroup");
from("direct:processFileGroup")
.routeId("processFileGroup")
.setBody(constant(Arrays.asList(requiredFiles)))
// Read required files and aggregate to a list
.split(body(), AggregationStrategies.groupedBody())
.log("${body}")
.pollEnrich()
.simple("file:{{input.path}}?fileName=${body}")
.timeout(1000)
.end()
.end()
// Do stuff with the List of Exchange(s) in body
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<GenericFile> files = exchange.getMessage().getBody(List.class);
for (int i = 0; i < files.size(); i++) {
System.out.println(files.get(i).getFileName());
}
}
});
或者,您可以将文件组件与一些自定义聚合策略一起使用,或者使用 File-watch 一些诡计,但至少对我来说这些听起来更复杂。
这是一个相当普遍的问题,所以可能有某种 EIP 可以更优雅地处理这个问题,但还没有偶然发现这样的问题。
我想在目录中有 4 个具有特定名称的文件时启动骆驼路线。当file1.csv.sem、file2.csv.sem、file3.csv.sem、file4.csv.sem开始骆驼路线
<from uri="file:{{directory.path}}?include=file1.*.sem"/>
我希望当这 4 个文件在目录中时,camel 路由将启动。
您可以创建使用自定义处理器的计时器或计划路线,该处理器使用 File.list()
或 File.listFiles()
检查文件夹的内容。如果所有必需的文件都存在,则处理器设置 body、header 或 属性 以发出路由应该继续的信号。
之后您可以使用 PollEnrich 单独使用每个文件并将它们存储到 header 或 属性 以备后用。
String filePath = getContext()
.resolvePropertyPlaceholders("{{input.path}}");
String[] requiredFiles = new String[]{
"file1.csv.sem",
"file2.csv.sem",
"file3.csv.sem"
};
// Poll input folder every 5 seconds for required files
from("timer:timerName?period=5000")
.routeId("multifiles")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
for (int i = 0; i < requiredFiles.length; i++) {
File file = new File(filePath + File.separator + requiredFiles[i]);
if(!file.exists()){
exchange.getMessage().setBody(false);
return;
}
}
exchange.getMessage().setBody(true);
}
})
.filter(body().isEqualTo(false))
.log("Missing files..")
.stop()
.end()
.to("direct:processFileGroup");
from("direct:processFileGroup")
.routeId("processFileGroup")
.setBody(constant(Arrays.asList(requiredFiles)))
// Read required files and aggregate to a list
.split(body(), AggregationStrategies.groupedBody())
.log("${body}")
.pollEnrich()
.simple("file:{{input.path}}?fileName=${body}")
.timeout(1000)
.end()
.end()
// Do stuff with the List of Exchange(s) in body
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<GenericFile> files = exchange.getMessage().getBody(List.class);
for (int i = 0; i < files.size(); i++) {
System.out.println(files.get(i).getFileName());
}
}
});
或者,您可以将文件组件与一些自定义聚合策略一起使用,或者使用 File-watch 一些诡计,但至少对我来说这些听起来更复杂。
这是一个相当普遍的问题,所以可能有某种 EIP 可以更优雅地处理这个问题,但还没有偶然发现这样的问题。