我如何使用 apache-camel 在一次轮询中读取 aws s3 目录中的所有文件

How do i can read all the files from aws s3 directory in a single poll using apache-camel

我正在努力实现:

  1. 从s3目录读取所有文件。
  2. 将所有文件复制到 s3 上的备份目录。
  3. 将所有文件内容聚合到一个文件中,并将其复制到 s3 上的另一个目录。

但我坚持第一个要点,即在一次轮询中读取所有文件。

my from router : aws-s3://${camel.bucket.name}?amazonS3Client=#s3Client&prefix=some_path_on_s3&deleteAfterRead=true&delay=100s

for example if, some_path_on_s3 -> has 2 files say first.txt and 
second.txt

according to camel documentation, it has to read both the files in a 
single poll, but is reading 1 file per poll.

I also tried with parameter,  maxMessagesPerPoll=2 but no luck. It 
still reads one file per poll.

有没有办法在一次轮询中从 s3 目录中获取所有文件?

事实是它一次将一个文件发送到路由,但它会在每次轮询时确认整个批次。

maxMessagesPerPoll 仅限制每批读取的文件数。我认为您正在寻找的信息在每个交易所的骆驼批头上:

CamelBatchComplete:一个布尔值,指示批次中的最后一个交换。仅适用于最后一个条目。

CamelBatchIndex:批次的当前索引。从 0 开始。

CamelBatchSize:此批次中轮询的 Exchange 总数。

使用此信息,您可以多播消息,然后实现一个聚合器以在一条路径上加入文件,一旦 CamelBatchComplete=true,并在另一条路径上备份文件。

在此处查找更多信息:

Batch consumer

Multicast

  • 我在这里工作,

    from("file://<some_path_to_dir>")
    .routeId("some_route_id")
    .to("backup_dir")
    .to("direct:aggregate")
    .end();
    
    
    
    from("direct:aggregate")
    .routeId("aggregate_router")
    .aggregate(constant(true), new GroupedExchangeAggregationStrategy())
            .completionPredicate(exchange -> {
                List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
                Exchange latestExchange = list.get(list.size() - 1);
                return (boolean) latestExchange.getProperty(Exchange.BATCH_COMPLETE);
            })
    .to("direct:merge");
    
    
    from("direct:merge")
            .routeId("merge_router")
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
                    StringBuilder builder = new StringBuilder();
                    for(Exchange ex : list){
                        builder.append(ex.getIn().getBody(String.class));
                    }
    
                    exchange.getIn().setBody(builder.toString());
                    // set any other necessary header if required here
                    // example, if aws s3 is the endpoint, set the S3Constants.KEY header here
                }
            })
    .to("some_final_endpoint");