我如何使用 apache-camel 在一次轮询中读取 aws s3 目录中的所有文件
How do i can read all the files from aws s3 directory in a single poll using apache-camel
我正在努力实现:
- 从s3目录读取所有文件。
- 将所有文件复制到 s3 上的备份目录。
- 将所有文件内容聚合到一个文件中,并将其复制到
s3 上的另一个目录。
但我坚持第一个要点,即在一次轮询中读取所有文件。
my from router :
aws-s3://${camel.bucket.name}?amazonS3Client=#s3Client&prefix=some_path_on_s3&deleteAfterRead=true&delay=100s
for example if, some_path_on_s3 -> has 2 files say first.txt and
second.txt
according to camel documentation, it has to read both the files in a
single poll, but is reading 1 file per poll.
I also tried with parameter, maxMessagesPerPoll=2 but no luck. It
still reads one file per poll.
有没有办法在一次轮询中从 s3 目录中获取所有文件?
事实是它一次将一个文件发送到路由,但它会在每次轮询时确认整个批次。
maxMessagesPerPoll 仅限制每批读取的文件数。我认为您正在寻找的信息在每个交易所的骆驼批头上:
CamelBatchComplete:一个布尔值,指示批次中的最后一个交换。仅适用于最后一个条目。
CamelBatchIndex:批次的当前索引。从 0 开始。
CamelBatchSize:此批次中轮询的 Exchange 总数。
使用此信息,您可以多播消息,然后实现一个聚合器以在一条路径上加入文件,一旦 CamelBatchComplete=true,并在另一条路径上备份文件。
在此处查找更多信息:
我在这里工作,
from("file://<some_path_to_dir>")
.routeId("some_route_id")
.to("backup_dir")
.to("direct:aggregate")
.end();
from("direct:aggregate")
.routeId("aggregate_router")
.aggregate(constant(true), new GroupedExchangeAggregationStrategy())
.completionPredicate(exchange -> {
List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
Exchange latestExchange = list.get(list.size() - 1);
return (boolean) latestExchange.getProperty(Exchange.BATCH_COMPLETE);
})
.to("direct:merge");
from("direct:merge")
.routeId("merge_router")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
StringBuilder builder = new StringBuilder();
for(Exchange ex : list){
builder.append(ex.getIn().getBody(String.class));
}
exchange.getIn().setBody(builder.toString());
// set any other necessary header if required here
// example, if aws s3 is the endpoint, set the S3Constants.KEY header here
}
})
.to("some_final_endpoint");
我正在努力实现:
- 从s3目录读取所有文件。
- 将所有文件复制到 s3 上的备份目录。
- 将所有文件内容聚合到一个文件中,并将其复制到 s3 上的另一个目录。
但我坚持第一个要点,即在一次轮询中读取所有文件。
my from router :
aws-s3://${camel.bucket.name}?amazonS3Client=#s3Client&prefix=some_path_on_s3&deleteAfterRead=true&delay=100s
for example if, some_path_on_s3 -> has 2 files say first.txt and
second.txt
according to camel documentation, it has to read both the files in a
single poll, but is reading 1 file per poll.
I also tried with parameter, maxMessagesPerPoll=2 but no luck. It
still reads one file per poll.
有没有办法在一次轮询中从 s3 目录中获取所有文件?
事实是它一次将一个文件发送到路由,但它会在每次轮询时确认整个批次。
maxMessagesPerPoll 仅限制每批读取的文件数。我认为您正在寻找的信息在每个交易所的骆驼批头上:
CamelBatchComplete:一个布尔值,指示批次中的最后一个交换。仅适用于最后一个条目。
CamelBatchIndex:批次的当前索引。从 0 开始。
CamelBatchSize:此批次中轮询的 Exchange 总数。
使用此信息,您可以多播消息,然后实现一个聚合器以在一条路径上加入文件,一旦 CamelBatchComplete=true,并在另一条路径上备份文件。
在此处查找更多信息:
我在这里工作,
from("file://<some_path_to_dir>") .routeId("some_route_id") .to("backup_dir") .to("direct:aggregate") .end(); from("direct:aggregate") .routeId("aggregate_router") .aggregate(constant(true), new GroupedExchangeAggregationStrategy()) .completionPredicate(exchange -> { List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); Exchange latestExchange = list.get(list.size() - 1); return (boolean) latestExchange.getProperty(Exchange.BATCH_COMPLETE); }) .to("direct:merge"); from("direct:merge") .routeId("merge_router") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); StringBuilder builder = new StringBuilder(); for(Exchange ex : list){ builder.append(ex.getIn().getBody(String.class)); } exchange.getIn().setBody(builder.toString()); // set any other necessary header if required here // example, if aws s3 is the endpoint, set the S3Constants.KEY header here } }) .to("some_final_endpoint");