骆驼 ZipFileDataFormat 拆分在流式传输时未设置 headers

Camel ZipFileDataFormat splitting doesn't set the headers when streaming

我有一个简单的路由,它从 FTP 服务器轮询 zip 文件。 zip 文件由一个需要处理的文件和零个或多个附件组成。 我正在尝试使用 ZipFileDataFormat 进行拆分,我能够根据需要拆分和路由项目,即将处理文件发送到处理器,将其他文件发送到聚合器端点。

路线如下所示:

from(sftp://username@server/folder/path?password=password&delay=600000)
.unmarshal(getZipFileDataFormat()).split(body(Iterator.class)).streaming()
.log("CamelSplitComplete :: ${header.CamelSplitComplete}")
.log("Split Size :: ${header.CamelSplitSize}")
 .choice()
   .when(header(MyConstants.CAMEL_FILE_NAME_HEADER).contains(".json"))
     .to(JSON_ENDPOINT).endChoice()
   .otherwise()
     .to(AGGREGATOR_ENDPOINT)
 .endChoice()
.end();

getZipFileDataFormat

private ZipFileDataFormat getZipFileDataFormat() {
        ZipFileDataFormat zipFile = new ZipFileDataFormat();
        zipFile.setUsingIterator(true);
        return zipFile;
    }

The splitting works fine. However, I can see in the logs that the two headers CamelSplitComplete and CamelSplitSize are not set correctly. Where CamelSplitComplete is always false, CamelSplitSize is not having any value.

因此,我无法根据大小进行汇总。我正在使用 eagerCheckCompletion() 在聚合器路由中获取输入交换。我的聚合器路由如下所示。

from(AGGREGATOR_ENDPOINT).aggregate(new ZipAggregationStrategy()).constant(true)
.eagerCheckCompletion().completionSize(header("CamelSplitSize"))to("file:///tmp/").end();

我读到 Apache Documentation 这些 headers 总是设置的。我在这里错过了什么吗?任何指向正确方向的指针都会非常有帮助。

我能够走完整条上班路线。我必须添加一种 pre-processor 来设置一些必要的 headers (传出文件名和 zip 的文件数)我需要进行聚合。

from(sftp://username@server/folder/path?password=password&delay=600000).to("file:///tmp/")
.beanRef("headerProcessor").unmarshal(getZipFileDataFormat())
.split(body(Iterator.class)).streaming()    
 .choice()
   .when(header(Exchange.FILE_NAME).contains(".json"))
     .to(JSON_ENDPOINT).endChoice()
   .otherwise()
    .to(AGGREGATOR_ENDPOINT)
 .endChoice()
.end();

之后,zip 聚合策略按预期工作。把聚合路由放在这里只是为了完成答案。

from(AGGREGATOR_ENDPOINT)
  .aggregate(header(MyConstants.HEADER_OUTGOING_FILE_NAME), new ZipAggregationStrategy())
  .eagerCheckCompletion().completionSize(header(MyConstants.HEADER_TOTAL_FILE_COUNT))
  .setHeader(Exchange.FILE_NAME, simple("${header.outgoingFileName}"))
 .to("file:///tmp/").end();